diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index d54fc02f3..89d14f2af 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1692,13 +1692,11 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; - bool stopOnFailure = true; DestReceiver *copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, - estate, stopOnFailure, - NULL); + estate, NULL); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 575bfa635..7e99fcf9b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -20,9 +20,9 @@ * in parallel. * * For hash-partitioned tables, if it fails to connect to a worker, the master - * marks the placement for which it was trying to open a connection as inactive, - * similar to the way DML statements are handled. If a failure occurs after - * connecting, the transaction is rolled back on all the workers. Note that, + * rollbacks the distributed transaction, similar to the way DML statements + * are handled. If a failure occurs after connecting, the transaction + * is rolled back on all the workers. Note that, * in the case of append-partitioned tables, if a fail occurs, immediately * metadata changes are rolled back on the master node, but shard placements * are left on the worker nodes. @@ -242,8 +242,7 @@ static void CopyToExistingShards(CopyStmt *copyStatement, static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId); static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, - ShardConnections *shardConnections, bool - stopOnFailure, + ShardConnections *shardConnections, bool useBinaryCopyFormat); static List * RemoveOptionFromList(List *optionList, char *optionName); static bool BinaryOutputFunctionDefined(Oid typeId); @@ -281,12 +280,11 @@ static HTAB * CreateShardStateHash(MemoryContext memoryContext); static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection); static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, - HTAB *connectionStateHash, bool stopOnFailure, + HTAB *connectionStateHash, bool *found, bool shouldUseLocalCopy, CopyOutState copyOutState, bool isColocatedIntermediateResult); static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, - bool stopOnFailure, bool colocatedIntermediateResult); static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash); static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, @@ -296,7 +294,7 @@ static List * ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 port); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, - uint64 shardId, bool stopOnFailure, + uint64 shardId, bool canUseLocalCopy, CopyOutState copyOutState, bool colocatedIntermediateResult); @@ -461,9 +459,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT List *columnNameList = NIL; int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; - - bool stopOnFailure = false; - uint64 processedRowCount = 0; ErrorContextCallback errorCallback; @@ -509,16 +504,10 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); - if (IsCitusTableType(tableId, REFERENCE_TABLE)) - { - stopOnFailure = true; - } - /* set up the destination for the COPY */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, - stopOnFailure, NULL); + executorState, NULL); DestReceiver *dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -611,9 +600,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT FreeExecutorState(executorState); table_close(distributedRelation, NoLock); - /* mark failed placements as inactive */ - MarkFailedShardPlacements(); - CHECK_FOR_INTERRUPTS(); if (completionTag != NULL) @@ -856,13 +842,12 @@ RemoveOptionFromList(List *optionList, char *optionName) /* * OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and * starts a COPY transaction if necessary. If a connection cannot be opened, - * then the shard placement is marked as inactive and the COPY continues with the remaining - * shard placements. + * then the transaction is rollbacked. */ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, ShardConnections *shardConnections, - bool stopOnFailure, bool useBinaryCopyFormat) + bool useBinaryCopyFormat) { int failedPlacementCount = 0; ListCell *placementCell = NULL; @@ -907,19 +892,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, if (PQstatus(connection->pgConn) != CONNECTION_OK) { - if (stopOnFailure) - { - ReportConnectionError(connection, ERROR); - } - else - { - const bool raiseErrors = true; - - HandleRemoteTransactionConnectionError(connection, raiseErrors); - - failedPlacementCount++; - continue; - } + ReportConnectionError(connection, ERROR); } /* @@ -954,10 +927,10 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, } /* - * If stopOnFailure is true, we just error out and code execution should - * never reach to this point. This is the case for reference tables. + * We should just error out and code execution should + * never reach to this point. This is the case for all tables. */ - Assert(!stopOnFailure || failedPlacementCount == 0); + Assert(failedPlacementCount == 0); shardConnections->connectionList = connectionList; @@ -1869,14 +1842,13 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, char *schemaName = copyStatement->relation->schemaname; char *qualifiedName = quote_qualified_identifier(schemaName, relationName); int64 shardId = CreateEmptyShard(qualifiedName); - bool stopOnFailure = true; shardConnections->shardId = shardId; shardConnections->connectionList = NIL; /* connect to shards placements and start transactions */ - OpenCopyConnectionsForNewShards(copyStatement, shardConnections, stopOnFailure, + OpenCopyConnectionsForNewShards(copyStatement, shardConnections, useBinaryCopyFormat); return shardId; @@ -2166,7 +2138,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) */ CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, - EState *executorState, bool stopOnFailure, + EState *executorState, char *intermediateResultIdPrefix) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( @@ -2184,7 +2156,6 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu copyDest->columnNameList = columnNameList; copyDest->partitionColumnIndex = partitionColumnIndex; copyDest->executorState = executorState; - copyDest->stopOnFailure = stopOnFailure; copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix; copyDest->memoryContext = CurrentMemoryContext; @@ -2527,8 +2498,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest bool cachedShardStateFound = false; bool firstTupleInShard = false; - bool stopOnFailure = copyDest->stopOnFailure; - EState *executorState = copyDest->executorState; MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); @@ -2548,7 +2517,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, - stopOnFailure, &cachedShardStateFound, copyDest->shouldUseLocalCopy, copyDest->copyOutState, @@ -3234,7 +3202,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) if (PQstatus(connection->pgConn) != CONNECTION_OK) { - HandleRemoteTransactionConnectionError(connection, raiseErrors); + ReportConnectionError(connection, ERROR); continue; } @@ -3242,7 +3210,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) if (!SendRemoteCommand(connection, copyCommand->data)) { - HandleRemoteTransactionConnectionError(connection, raiseErrors); + ReportConnectionError(connection, ERROR); continue; } @@ -3610,7 +3578,7 @@ ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 */ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, - HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool + HTAB *connectionStateHash, bool *found, bool shouldUseLocalCopy, CopyOutState copyOutState, bool isColocatedIntermediateResult) { @@ -3619,7 +3587,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, if (!*found) { InitializeCopyShardState(shardState, connectionStateHash, - shardId, stopOnFailure, shouldUseLocalCopy, + shardId, shouldUseLocalCopy, copyOutState, isColocatedIntermediateResult); } @@ -3635,7 +3603,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure, bool shouldUseLocalCopy, + bool shouldUseLocalCopy, CopyOutState copyOutState, bool colocatedIntermediateResult) { @@ -3685,7 +3653,7 @@ InitializeCopyShardState(CopyShardState *shardState, } MultiConnection *connection = - CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure, + CopyGetPlacementConnection(connectionStateHash, placement, colocatedIntermediateResult); if (connection == NULL) { @@ -3729,10 +3697,10 @@ InitializeCopyShardState(CopyShardState *shardState, } /* - * If stopOnFailure is true, we just error out and code execution should - * never reach to this point. This is the case for reference tables. + * We just error out and code execution should never reach to this + * point. This is the case for all tables. */ - Assert(!stopOnFailure || failedPlacementCount == 0); + Assert(failedPlacementCount == 0); MemoryContextReset(localContext); } @@ -3798,7 +3766,7 @@ LogLocalCopyToFileExecution(uint64 shardId) */ static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, - bool stopOnFailure, bool colocatedIntermediateResult) + bool colocatedIntermediateResult) { if (colocatedIntermediateResult) { @@ -3984,18 +3952,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, if (PQstatus(connection->pgConn) != CONNECTION_OK) { - if (stopOnFailure) - { - ReportConnectionError(connection, ERROR); - } - else - { - const bool raiseErrors = true; - - HandleRemoteTransactionConnectionError(connection, raiseErrors); - - return NULL; - } + ReportConnectionError(connection, ERROR); } /* diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index b5d1b260c..775e4b1cf 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -200,7 +200,7 @@ static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection, ShardPlacement *placement); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); -static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry); +static bool HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry); static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize); static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize); @@ -981,18 +981,19 @@ ResetPlacementConnectionManagement(void) /* - * MarkFailedShardPlacements looks through every connection in the connection shard hash - * and marks the placements associated with failed connections invalid. + * ErrorIfPostCommitFailedShardPlacements throws an error if any of the placements + * that modified the database and involved in the transaction has failed. * - * Every shard must have at least one placement connection which did not fail. If all - * modifying connections for a shard failed then the transaction will be aborted. + * Note that Citus already fails queries/commands in case of any failures during query + * processing. However, there are certain failures that can only be detected on the + * COMMIT time. And, this check mainly ensures to catch errors that happens on the + * COMMIT time on the placements. * - * This will be called just before commit, so we can abort before executing remote - * commits. It should also be called after modification statements, to ensure that we - * don't run future statements against placements which are not up to date. + * The most common example for this case is the deferred errors that are thrown by + * triggers or constraints at the COMMIT time. */ void -MarkFailedShardPlacements() +ErrorIfPostCommitFailedShardPlacements(void) { HASH_SEQ_STATUS status; ConnectionShardHashEntry *shardEntry = NULL; @@ -1000,82 +1001,27 @@ MarkFailedShardPlacements() hash_seq_init(&status, ConnectionShardHash); while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) { - if (!CheckShardPlacements(shardEntry)) + if (HasModificationFailedForShard(shardEntry)) { ereport(ERROR, - (errmsg("could not make changes to shard " INT64_FORMAT - " on any node", - shardEntry->key.shardId))); - } - } -} - - -/* - * PostCommitMarkFailedShardPlacements marks placements invalid and checks whether - * sufficiently many placements have failed to abort the entire coordinated - * transaction. - * - * This will be called just after a coordinated commit so we can handle remote - * transactions which failed during commit. - * - * When using2PC is set as least one placement must succeed per shard. If all placements - * fail for a shard the entire transaction is aborted. If using2PC is not set then a only - * a warning will be emitted; we cannot abort because some remote transactions might have - * already been committed. - */ -void -PostCommitMarkFailedShardPlacements(bool using2PC) -{ - HASH_SEQ_STATUS status; - ConnectionShardHashEntry *shardEntry = NULL; - int successes = 0; - int attempts = 0; - - int elevel = using2PC ? ERROR : WARNING; - - hash_seq_init(&status, ConnectionShardHash); - while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) - { - attempts++; - if (CheckShardPlacements(shardEntry)) - { - successes++; - } - else - { - /* - * Only error out if we're using 2PC. If we're not using 2PC we can't error - * out otherwise we can end up with a state where some shard modifications - * have already committed successfully. - */ - ereport(elevel, (errmsg("could not commit transaction for shard " INT64_FORMAT - " on any active node", - shardEntry->key.shardId))); + " on at least one active node", shardEntry->key.shardId))); } } - - /* - * If no shards could be modified at all, error out. Doesn't matter whether - * we're post-commit - there's nothing to invalidate. - */ - if (attempts > 0 && successes == 0) - { - ereport(ERROR, (errmsg("could not commit transaction on any active node"))); - } } /* - * CheckShardPlacements is a helper function for CheckForFailedPlacements that - * performs the per-shard work. + * HasModificationFailedForShard is a helper function for + * ErrorIfPostCommitFailedShardPlacements that performs the per-shard work. + * + * The function returns true if any placement of the input shard is modified + * and any failures has happened (either connection failures or transaction + * failures). */ static bool -CheckShardPlacements(ConnectionShardHashEntry *shardEntry) +HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry) { - int failures = 0; - int successes = 0; dlist_iter placementIter; dlist_foreach(placementIter, &shardEntry->placementConnections) @@ -1095,51 +1041,11 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) if (!connection || connection->remoteTransaction.transactionFailed) { - placementEntry->failed = true; - failures++; - } - else - { - successes++; + return true; } } - /* - * If there were any failures we want to bail on a commit in two situations: - * there were no successes, or there was a failure with a reference table shard. - * Ideally issues with a reference table will've errored out earlier, - * but if not, we abort now to avoid an unhealthy reference table shard. - */ - if (failures > 0 && - (successes == 0 || ReferenceTableShardId(shardEntry->key.shardId))) - { - return false; - } - - /* mark all failed placements invalid */ - dlist_foreach(placementIter, &shardEntry->placementConnections) - { - ConnectionPlacementHashEntry *placementEntry = - dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); - - if (placementEntry->failed) - { - uint64 shardId = shardEntry->key.shardId; - uint64 placementId = placementEntry->key.placementId; - ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); - - /* - * We only set shard state if it currently is SHARD_STATE_ACTIVE, which - * prevents overwriting shard state if it was already set somewhere else. - */ - if (shardPlacement->shardState == SHARD_STATE_ACTIVE) - { - MarkShardPlacementInactive(shardPlacement); - } - } - } - - return true; + return false; } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ef33a6585..90fa0e77b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -690,7 +690,6 @@ static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExec bool succeeded); static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution * placementExecution); -static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution); static void PlacementExecutionReady(TaskPlacementExecution *placementExecution); static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution * shardCommandExecution); @@ -4683,20 +4682,6 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede } else { - if (ShouldMarkPlacementsInvalidOnFailure(execution)) - { - ShardPlacement *shardPlacement = placementExecution->shardPlacement; - - /* - * We only set shard state if it currently is SHARD_STATE_ACTIVE, which - * prevents overwriting shard state if it was already set somewhere else. - */ - if (shardPlacement->shardState == SHARD_STATE_ACTIVE) - { - MarkShardPlacementInactive(shardPlacement); - } - } - if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY) { /* @@ -4877,30 +4862,6 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool } -/* - * ShouldMarkPlacementsInvalidOnFailure returns true if the failure - * should trigger marking placements invalid. - */ -static bool -ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution) -{ - if (!DistributedExecutionModifiesDatabase(execution) || - execution->transactionProperties->errorOnAnyFailure) - { - /* - * Failures that do not modify the database (e.g., mainly SELECTs) should - * never lead to invalid placement. - * - * Failures that lead throwing error, no need to mark any placement - * invalid. - */ - return false; - } - - return true; -} - - /* * PlacementExecutionReady adds a placement execution to the ready queue when * its dependent placement executions have finished. diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 618dbab86..338b03075 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -496,12 +496,6 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, char *intermediateResultIdPrefix) { ParamListInfo paramListInfo = executorState->es_param_list_info; - bool stopOnFailure = false; - - if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) - { - stopOnFailure = true; - } /* Get column name list and partition column index for the target table */ List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId, @@ -514,7 +508,6 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, columnNameList, partitionColumnIndex, executorState, - stopOnFailure, intermediateResultIdPrefix); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -537,12 +530,6 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, PlannedStmt *selectPlan, EState *executorState) { ParamListInfo paramListInfo = executorState->es_param_list_info; - bool stopOnFailure = false; - - if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) - { - stopOnFailure = true; - } /* Get column name list and partition column index for the target table */ List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId, @@ -554,8 +541,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, - executorState, - stopOnFailure, NULL); + executorState, NULL); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 84ff6bcdf..ddd8b49bc 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1957,63 +1957,6 @@ DeleteShardPlacementRow(uint64 placementId) } -/* - * UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong - * to partitioned table. The function goes over the corresponding placements of its - * partitions, and sets their state to the input shardState. - */ -void -UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState) -{ - ShardInterval *parentShardInterval = - LoadShardInterval(parentShardPlacement->shardId); - Oid partitionedTableOid = parentShardInterval->relationId; - - /* this function should only be called for partitioned tables */ - Assert(PartitionedTable(partitionedTableOid)); - - List *partitionList = PartitionList(partitionedTableOid); - Oid partitionOid = InvalidOid; - foreach_oid(partitionOid, partitionList) - { - uint64 partitionShardId = - ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex); - - ShardPlacement *partitionPlacement = - ShardPlacementOnGroupIncludingOrphanedPlacements( - parentShardPlacement->groupId, partitionShardId); - - /* the partition should have a placement with the same group */ - Assert(partitionPlacement != NULL); - - UpdateShardPlacementState(partitionPlacement->placementId, shardState); - } -} - - -/* - * MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where - * the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the - * shard placements as inactive if shardPlacement belongs to a partitioned table. - */ -void -MarkShardPlacementInactive(ShardPlacement *shardPlacement) -{ - UpdateShardPlacementState(shardPlacement->placementId, SHARD_STATE_INACTIVE); - - /* - * In case the shard belongs to a partitioned table, we make sure to update - * the states of its partitions. Repairing shards already ensures to recreate - * all the partitions. - */ - ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); - if (PartitionedTable(shardInterval->relationId)) - { - UpdatePartitionShardPlacementStates(shardPlacement, SHARD_STATE_INACTIVE); - } -} - - /* * UpdateShardPlacementState sets the shardState for the placement identified * by placementId. diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 2b3f44ddd..0d787d3cc 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1081,7 +1081,17 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source shardPlacementList, targetNodeName, targetNodePort); - if (targetPlacement->shardState != SHARD_STATE_INACTIVE) + + /* + * shardStateInactive is a legacy state for a placement. As of Citus 11, + * we never mark any placement as INACTIVE. + * + * Still, we prefer to keep this function/code here, as users may need + * to recover placements that are marked as inactive pre Citus 11. + * + */ + int shardStateInactive = 3; + if (targetPlacement->shardState != shardStateInactive) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("target placement must be in inactive state"))); diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 66ee281ba..4b770b7a9 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -302,6 +302,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) } UseCoordinatedTransaction(); + Use2PCForCoordinatedTransaction(); /* issue command to append table to each shard placement */ ShardPlacement *shardPlacement = NULL; @@ -327,20 +328,11 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) RemoteTransactionBeginIfNecessary(connection); - int executeResult = ExecuteOptionalRemoteCommand(connection, - workerAppendQuery->data, - &queryResult); + ExecuteCriticalRemoteCommand(connection, workerAppendQuery->data); PQclear(queryResult); ForgetResults(connection); - - if (executeResult != 0) - { - MarkRemoteTransactionFailed(connection, false); - } } - MarkFailedShardPlacements(); - /* update shard statistics and get new shard size */ uint64 newShardSize = UpdateShardStatistics(shardId); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 191aec8ab..ee9912fe9 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -424,14 +424,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * fails, which can lead to divergence when not using 2PC. */ - /* - * Check whether the coordinated transaction is in a state we want - * to persist, or whether we want to error out. This handles the - * case where iteratively executed commands marked all placements - * as invalid. - */ - MarkFailedShardPlacements(); - if (ShouldCoordinatedTransactionUse2PC) { CoordinatedRemoteTransactionsPrepare(); @@ -458,9 +450,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* * Check again whether shards/placement successfully - * committed. This handles failure at COMMIT/PREPARE time. + * committed. This handles failure at COMMIT time. */ - PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC); + ErrorIfPostCommitFailedShardPlacements(); break; } diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 4d1988347..105c05b98 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -108,8 +108,6 @@ typedef struct CitusCopyDestReceiver /* template for COPY statement to send to workers */ CopyStmt *copyStatement; - bool stopOnFailure; - /* * shardId to CopyShardState map. Also used in insert_select_executor.c for * task pruning. @@ -154,7 +152,6 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, int partitionColumnIndex, EState *executorState, - bool stopOnFailure, char *intermediateResultPrefix); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 6065e210d..0e4a6e6c8 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -237,9 +237,6 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, char replicationModel); extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); -extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, - char shardState); -extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRow(uint64 placementId); diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index 0e7eb605b..ca73f016a 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -32,8 +32,7 @@ extern void AssignPlacementListToConnection(List *placementAccessList, MultiConnection *connection); extern void ResetPlacementConnectionManagement(void); -extern void MarkFailedShardPlacements(void); -extern void PostCommitMarkFailedShardPlacements(bool using2PC); +extern void ErrorIfPostCommitFailedShardPlacements(void); extern void CloseShardPlacementAssociation(struct MultiConnection *connection); extern void ResetShardPlacementAssociation(struct MultiConnection *connection); diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index 82de22e0d..35c66761d 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -27,12 +27,14 @@ /* * ShardState represents last known states of shards on a given node. + * + * The numbers assigned per state used for historical reason and should + * not be changed since they correspond to shardstate in pg_dist_placement. */ typedef enum { SHARD_STATE_INVALID_FIRST = 0, SHARD_STATE_ACTIVE = 1, - SHARD_STATE_INACTIVE = 3, SHARD_STATE_TO_DELETE = 4, } ShardState; diff --git a/src/test/regress/expected/failure_copy_on_hash.out b/src/test/regress/expected/failure_copy_on_hash.out index 72be2a4fd..749345392 100644 --- a/src/test/regress/expected/failure_copy_on_hash.out +++ b/src/test/regress/expected/failure_copy_on_hash.out @@ -36,12 +36,10 @@ SELECT citus.mitmproxy('conn.kill()'); (1 row) \COPY test_table FROM stdin delimiter ','; -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly +ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. CONTEXT: COPY test_table, line 1: "1,2" -ERROR: could not connect to any active placements -CONTEXT: COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -281,22 +279,10 @@ SELECT citus.mitmproxy('conn.kill()'); (1 row) \COPY test_table_2 FROM stdin delimiter ','; -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly +ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. CONTEXT: COPY test_table_2, line 1: "1,2" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: COPY test_table_2, line 2: "3,4" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: COPY test_table_2, line 3: "6,7" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: COPY test_table_2, line 5: "9,10" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -311,13 +297,13 @@ SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate ORDER BY shardid, nodeport; logicalrelid | shardid | shardstate --------------------------------------------------------------------- - test_table_2 | 1710004 | 3 test_table_2 | 1710004 | 1 - test_table_2 | 1710005 | 3 + test_table_2 | 1710004 | 1 + test_table_2 | 1710005 | 1 test_table_2 | 1710005 | 1 - test_table_2 | 1710006 | 3 test_table_2 | 1710006 | 1 - test_table_2 | 1710007 | 3 + test_table_2 | 1710006 | 1 + test_table_2 | 1710007 | 1 test_table_2 | 1710007 | 1 (8 rows) diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index 21ad09f7a..b51762e08 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -165,7 +165,7 @@ connection not open connection not open CONTEXT: while executing command on localhost:xxxxx COMMIT; -ERROR: could not make changes to shard xxxxx on any node +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT * FROM artists WHERE id IN (4, 5); id | name --------------------------------------------------------------------- @@ -255,7 +255,7 @@ connection not open connection not open CONTEXT: while executing command on localhost:xxxxx COMMIT; -ERROR: could not make changes to shard xxxxx on any node +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT * FROM artists WHERE id IN (4, 5); id | name --------------------------------------------------------------------- @@ -340,7 +340,7 @@ connection not open connection not open connection not open COMMIT; -ERROR: could not make changes to shard xxxxx on any node +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT * FROM artists WHERE id=6; id | name --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 5509bd501..acf958f33 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -377,6 +377,35 @@ COMMIT; WARNING: illegal value WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress ERROR: illegal value +-- single row, multi-row INSERTs should also fail +-- with or without transaction blocks on the COMMIT time +INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'); +WARNING: illegal value +WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +ERROR: illegal value +INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup'); +WARNING: illegal value +WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +ERROR: illegal value +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'); +COMMIT; +WARNING: illegal value +WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +ERROR: illegal value +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup'); +COMMIT; +WARNING: illegal value +WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +ERROR: illegal value +-- and, rollback should be fine +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'); +ROLLBACK; +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup'); +ROLLBACK; \unset VERBOSITY -- verify everyhing including delete is rolled back SELECT * FROM researchers WHERE lab_id = 6; @@ -1195,28 +1224,28 @@ ORDER BY s.logicalrelid, sp.shardstate; reference_failure_test | 1 | 2 (1 row) +-- any failure rollbacks the transaction BEGIN; COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist --- some placements are invalid before abort +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist +ABORT; +-- none of placements are invalid after abort SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; shardid | shardstate | nodename | nodeport --------------------------------------------------------------------- - 1200016 | 3 | localhost | 57637 + 1200016 | 1 | localhost | 57637 1200016 | 1 | localhost | 57638 1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57638 1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57638 - 1200019 | 3 | localhost | 57637 + 1200019 | 1 | localhost | 57637 1200019 | 1 | localhost | 57638 (8 rows) -ABORT; -- verify nothing is inserted SELECT count(*) FROM numbers_hash_failure_test; WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist @@ -1243,52 +1272,35 @@ ORDER BY shardid, nodeport; 1200019 | 1 | localhost | 57638 (8 rows) +-- all failures roll back the transaction BEGIN; COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist --- check shard states before commit -SELECT shardid, shardstate, nodename, nodeport -FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) -WHERE logicalrelid = 'numbers_hash_failure_test'::regclass -ORDER BY shardid, nodeport; - shardid | shardstate | nodename | nodeport ---------------------------------------------------------------------- - 1200016 | 3 | localhost | 57637 - 1200016 | 1 | localhost | 57638 - 1200017 | 1 | localhost | 57637 - 1200017 | 1 | localhost | 57638 - 1200018 | 1 | localhost | 57637 - 1200018 | 1 | localhost | 57638 - 1200019 | 3 | localhost | 57637 - 1200019 | 1 | localhost | 57638 -(8 rows) - +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist COMMIT; --- expect some placements to be market invalid after commit +-- expect none of the placements to be market invalid after commit SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; shardid | shardstate | nodename | nodeport --------------------------------------------------------------------- - 1200016 | 3 | localhost | 57637 + 1200016 | 1 | localhost | 57637 1200016 | 1 | localhost | 57638 1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57638 1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57638 - 1200019 | 3 | localhost | 57637 + 1200019 | 1 | localhost | 57637 1200019 | 1 | localhost | 57638 (8 rows) --- verify data is inserted +-- verify no data is inserted SELECT count(*) FROM numbers_hash_failure_test; WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist count --------------------------------------------------------------------- - 2 + 0 (1 row) -- break the other node as well diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index 0466c847e..8c08673b2 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -343,9 +343,7 @@ INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:xxxxx -WARNING: could not commit transaction for shard xxxxx on any active node -WARNING: could not commit transaction for shard xxxxx on any active node -ERROR: could not commit transaction on any active node +ERROR: could not commit transaction for shard xxxxx on at least one active node -- data should NOT be persisted SELECT * FROM objects_mx WHERE id = 2; id | name @@ -371,9 +369,7 @@ INSERT INTO labs_mx VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:xxxxx -WARNING: could not commit transaction for shard xxxxx on any active node -WARNING: could not commit transaction for shard xxxxx on any active node -ERROR: could not commit transaction on any active node +ERROR: could not commit transaction for shard xxxxx on at least one active node -- data should NOT be persisted SELECT * FROM objects_mx WHERE id = 1; id | name @@ -396,9 +392,7 @@ INSERT INTO labs_mx VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:xxxxx -WARNING: could not commit transaction for shard xxxxx on any active node -WARNING: could not commit transaction for shard xxxxx on any active node -ERROR: could not commit transaction on any active node +ERROR: could not commit transaction for shard xxxxx on at least one active node -- no data should persists SELECT * FROM objects_mx WHERE id = 1; id | name diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 9ec3f9914..d624bc0ee 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -577,7 +577,7 @@ SET session_replication_role = DEFAULT; ALTER USER test_user WITH nologin; \c - test_user - :master_port --- reissue copy +-- reissue copy, and it should fail COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); 1,1 2,2 @@ -589,7 +589,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); 8,8 \. --- verify shards in the first worker as marked invalid +-- verify shards in the none of the workers as marked invalid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index e1b128105..a353321e3 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -784,29 +784,23 @@ SET session_replication_role = DEFAULT; \c - :default_user - :worker_1_port ALTER USER test_user WITH nologin; \c - test_user - :master_port --- reissue copy +-- reissue copy, and it should fail COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_hash, line 1: "1,1" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash, line 2: "2,2" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash, line 3: "3,3" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash, line 6: "6,6" --- verify shards in the first worker as marked invalid +-- verify shards in the none of the workers as marked invalid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport --------------------------------------------------------------------- - 560169 | 3 | localhost | 57637 + 560169 | 1 | localhost | 57637 560169 | 1 | localhost | 57638 - 560170 | 3 | localhost | 57637 + 560170 | 1 | localhost | 57637 560170 | 1 | localhost | 57638 - 560171 | 3 | localhost | 57637 + 560171 | 1 | localhost | 57637 560171 | 1 | localhost | 57638 - 560172 | 3 | localhost | 57637 + 560172 | 1 | localhost | 57637 560172 | 1 | localhost | 57638 (8 rows) @@ -828,12 +822,8 @@ SELECT shardid, shardstate, nodename, nodeport -- since it can not insert into either copies of a shard. shards are expected to -- stay valid since the operation is rolled back. COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_hash_other, line 1: "1,1" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash_other, line 2: "2,2" -WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash_other, line 3: "3,3" -- verify shards for numbers_hash_other are still valid -- since copy has failed altogether SELECT shardid, shardstate, nodename, nodeport @@ -841,13 +831,13 @@ SELECT shardid, shardstate, nodename, nodeport WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport --------------------------------------------------------------------- - 560174 | 3 | localhost | 57637 + 560174 | 1 | localhost | 57637 560174 | 1 | localhost | 57638 - 560175 | 3 | localhost | 57637 + 560175 | 1 | localhost | 57637 560175 | 1 | localhost | 57638 560176 | 1 | localhost | 57637 560176 | 1 | localhost | 57638 - 560177 | 3 | localhost | 57637 + 560177 | 1 | localhost | 57637 560177 | 1 | localhost | 57638 (8 rows) diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 4c46011be..4caac28ba 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -310,6 +310,29 @@ DELETE FROM researchers WHERE lab_id = 6; 30, 6, 'Dennis Ritchie' \. COMMIT; + +-- single row, multi-row INSERTs should also fail +-- with or without transaction blocks on the COMMIT time +INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'); +INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup'); + +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'); +COMMIT; + +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup'); +COMMIT; + +-- and, rollback should be fine +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'); +ROLLBACK; + +BEGIN; + INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup'); +ROLLBACK; + \unset VERBOSITY -- verify everyhing including delete is rolled back @@ -935,20 +958,20 @@ AND s.logicalrelid = 'reference_failure_test'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; +-- any failure rollbacks the transaction BEGIN; COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); 1,1 2,2 \. +ABORT; --- some placements are invalid before abort +-- none of placements are invalid after abort SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; -ABORT; - -- verify nothing is inserted SELECT count(*) FROM numbers_hash_failure_test; @@ -958,27 +981,21 @@ FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; +-- all failures roll back the transaction BEGIN; COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); 1,1 2,2 \. - --- check shard states before commit -SELECT shardid, shardstate, nodename, nodeport -FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) -WHERE logicalrelid = 'numbers_hash_failure_test'::regclass -ORDER BY shardid, nodeport; - COMMIT; --- expect some placements to be market invalid after commit +-- expect none of the placements to be market invalid after commit SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; --- verify data is inserted +-- verify no data is inserted SELECT count(*) FROM numbers_hash_failure_test; -- break the other node as well