Drop support for Inactive Shard placements

Given that we do all operations via 2PC, there is no way
for any placement to be marked as INACTIVE.
pull/5381/head
Onder Kalaci 2021-10-15 15:02:20 +02:00
parent b3299de81c
commit 575bb6dde9
20 changed files with 164 additions and 425 deletions

View File

@ -1692,13 +1692,11 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
ExprContext *econtext = GetPerTupleExprContext(estate); ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot; econtext->ecxt_scantuple = slot;
bool stopOnFailure = true;
DestReceiver *copyDest = DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList, columnNameList,
partitionColumnIndex, partitionColumnIndex,
estate, stopOnFailure, estate, NULL);
NULL);
/* initialise state for writing to shards, we'll open connections on demand */ /* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor); copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -20,9 +20,9 @@
* in parallel. * in parallel.
* *
* For hash-partitioned tables, if it fails to connect to a worker, the master * For hash-partitioned tables, if it fails to connect to a worker, the master
* marks the placement for which it was trying to open a connection as inactive, * rollbacks the distributed transaction, similar to the way DML statements
* similar to the way DML statements are handled. If a failure occurs after * are handled. If a failure occurs after connecting, the transaction
* connecting, the transaction is rolled back on all the workers. Note that, * is rolled back on all the workers. Note that,
* in the case of append-partitioned tables, if a fail occurs, immediately * in the case of append-partitioned tables, if a fail occurs, immediately
* metadata changes are rolled back on the master node, but shard placements * metadata changes are rolled back on the master node, but shard placements
* are left on the worker nodes. * are left on the worker nodes.
@ -242,8 +242,7 @@ static void CopyToExistingShards(CopyStmt *copyStatement,
static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag,
Oid relationId); Oid relationId);
static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool ShardConnections *shardConnections,
stopOnFailure,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static List * RemoveOptionFromList(List *optionList, char *optionName); static List * RemoveOptionFromList(List *optionList, char *optionName);
static bool BinaryOutputFunctionDefined(Oid typeId); static bool BinaryOutputFunctionDefined(Oid typeId);
@ -281,12 +280,11 @@ static HTAB * CreateShardStateHash(MemoryContext memoryContext);
static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
MultiConnection *connection); MultiConnection *connection);
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, HTAB *connectionStateHash,
bool *found, bool shouldUseLocalCopy, CopyOutState bool *found, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isColocatedIntermediateResult); copyOutState, bool isColocatedIntermediateResult);
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement, ShardPlacement *placement,
bool stopOnFailure,
bool colocatedIntermediateResult); bool colocatedIntermediateResult);
static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash); static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash);
static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList,
@ -296,7 +294,7 @@ static List * ConnectionStateListToNode(HTAB *connectionStateHash,
const char *hostname, int32 port); const char *hostname, int32 port);
static void InitializeCopyShardState(CopyShardState *shardState, static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure, uint64 shardId,
bool canUseLocalCopy, bool canUseLocalCopy,
CopyOutState copyOutState, CopyOutState copyOutState,
bool colocatedIntermediateResult); bool colocatedIntermediateResult);
@ -461,9 +459,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
List *columnNameList = NIL; List *columnNameList = NIL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
bool stopOnFailure = false;
uint64 processedRowCount = 0; uint64 processedRowCount = 0;
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
@ -509,16 +504,10 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
if (IsCitusTableType(tableId, REFERENCE_TABLE))
{
stopOnFailure = true;
}
/* set up the destination for the COPY */ /* set up the destination for the COPY */
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
partitionColumnIndex, partitionColumnIndex,
executorState, executorState, NULL);
stopOnFailure, NULL);
DestReceiver *dest = (DestReceiver *) copyDest; DestReceiver *dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor); dest->rStartup(dest, 0, tupleDescriptor);
@ -611,9 +600,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
FreeExecutorState(executorState); FreeExecutorState(executorState);
table_close(distributedRelation, NoLock); table_close(distributedRelation, NoLock);
/* mark failed placements as inactive */
MarkFailedShardPlacements();
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
if (completionTag != NULL) if (completionTag != NULL)
@ -856,13 +842,12 @@ RemoveOptionFromList(List *optionList, char *optionName)
/* /*
* OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and * OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and
* starts a COPY transaction if necessary. If a connection cannot be opened, * starts a COPY transaction if necessary. If a connection cannot be opened,
* then the shard placement is marked as inactive and the COPY continues with the remaining * then the transaction is rollbacked.
* shard placements.
*/ */
static void static void
OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat) bool useBinaryCopyFormat)
{ {
int failedPlacementCount = 0; int failedPlacementCount = 0;
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
@ -907,19 +892,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
if (stopOnFailure) ReportConnectionError(connection, ERROR);
{
ReportConnectionError(connection, ERROR);
}
else
{
const bool raiseErrors = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors);
failedPlacementCount++;
continue;
}
} }
/* /*
@ -954,10 +927,10 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
} }
/* /*
* If stopOnFailure is true, we just error out and code execution should * We should just error out and code execution should
* never reach to this point. This is the case for reference tables. * never reach to this point. This is the case for all tables.
*/ */
Assert(!stopOnFailure || failedPlacementCount == 0); Assert(failedPlacementCount == 0);
shardConnections->connectionList = connectionList; shardConnections->connectionList = connectionList;
@ -1869,14 +1842,13 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
char *schemaName = copyStatement->relation->schemaname; char *schemaName = copyStatement->relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName); char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
int64 shardId = CreateEmptyShard(qualifiedName); int64 shardId = CreateEmptyShard(qualifiedName);
bool stopOnFailure = true;
shardConnections->shardId = shardId; shardConnections->shardId = shardId;
shardConnections->connectionList = NIL; shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */ /* connect to shards placements and start transactions */
OpenCopyConnectionsForNewShards(copyStatement, shardConnections, stopOnFailure, OpenCopyConnectionsForNewShards(copyStatement, shardConnections,
useBinaryCopyFormat); useBinaryCopyFormat);
return shardId; return shardId;
@ -2166,7 +2138,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
*/ */
CitusCopyDestReceiver * CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState, bool stopOnFailure, EState *executorState,
char *intermediateResultIdPrefix) char *intermediateResultIdPrefix)
{ {
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
@ -2184,7 +2156,6 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->columnNameList = columnNameList; copyDest->columnNameList = columnNameList;
copyDest->partitionColumnIndex = partitionColumnIndex; copyDest->partitionColumnIndex = partitionColumnIndex;
copyDest->executorState = executorState; copyDest->executorState = executorState;
copyDest->stopOnFailure = stopOnFailure;
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix; copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext; copyDest->memoryContext = CurrentMemoryContext;
@ -2527,8 +2498,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
bool cachedShardStateFound = false; bool cachedShardStateFound = false;
bool firstTupleInShard = false; bool firstTupleInShard = false;
bool stopOnFailure = copyDest->stopOnFailure;
EState *executorState = copyDest->executorState; EState *executorState = copyDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
@ -2548,7 +2517,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
copyDest->connectionStateHash, copyDest->connectionStateHash,
stopOnFailure,
&cachedShardStateFound, &cachedShardStateFound,
copyDest->shouldUseLocalCopy, copyDest->shouldUseLocalCopy,
copyDest->copyOutState, copyDest->copyOutState,
@ -3234,7 +3202,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
HandleRemoteTransactionConnectionError(connection, raiseErrors); ReportConnectionError(connection, ERROR);
continue; continue;
} }
@ -3242,7 +3210,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
if (!SendRemoteCommand(connection, copyCommand->data)) if (!SendRemoteCommand(connection, copyCommand->data))
{ {
HandleRemoteTransactionConnectionError(connection, raiseErrors); ReportConnectionError(connection, ERROR);
continue; continue;
} }
@ -3610,7 +3578,7 @@ ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32
*/ */
static CopyShardState * static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash, GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool HTAB *connectionStateHash, bool *found, bool
shouldUseLocalCopy, CopyOutState copyOutState, shouldUseLocalCopy, CopyOutState copyOutState,
bool isColocatedIntermediateResult) bool isColocatedIntermediateResult)
{ {
@ -3619,7 +3587,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
if (!*found) if (!*found)
{ {
InitializeCopyShardState(shardState, connectionStateHash, InitializeCopyShardState(shardState, connectionStateHash,
shardId, stopOnFailure, shouldUseLocalCopy, shardId, shouldUseLocalCopy,
copyOutState, isColocatedIntermediateResult); copyOutState, isColocatedIntermediateResult);
} }
@ -3635,7 +3603,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
static void static void
InitializeCopyShardState(CopyShardState *shardState, InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId, HTAB *connectionStateHash, uint64 shardId,
bool stopOnFailure, bool shouldUseLocalCopy, bool shouldUseLocalCopy,
CopyOutState copyOutState, CopyOutState copyOutState,
bool colocatedIntermediateResult) bool colocatedIntermediateResult)
{ {
@ -3685,7 +3653,7 @@ InitializeCopyShardState(CopyShardState *shardState,
} }
MultiConnection *connection = MultiConnection *connection =
CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure, CopyGetPlacementConnection(connectionStateHash, placement,
colocatedIntermediateResult); colocatedIntermediateResult);
if (connection == NULL) if (connection == NULL)
{ {
@ -3729,10 +3697,10 @@ InitializeCopyShardState(CopyShardState *shardState,
} }
/* /*
* If stopOnFailure is true, we just error out and code execution should * We just error out and code execution should never reach to this
* never reach to this point. This is the case for reference tables. * point. This is the case for all tables.
*/ */
Assert(!stopOnFailure || failedPlacementCount == 0); Assert(failedPlacementCount == 0);
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
@ -3798,7 +3766,7 @@ LogLocalCopyToFileExecution(uint64 shardId)
*/ */
static MultiConnection * static MultiConnection *
CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
bool stopOnFailure, bool colocatedIntermediateResult) bool colocatedIntermediateResult)
{ {
if (colocatedIntermediateResult) if (colocatedIntermediateResult)
{ {
@ -3984,18 +3952,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
if (stopOnFailure) ReportConnectionError(connection, ERROR);
{
ReportConnectionError(connection, ERROR);
}
else
{
const bool raiseErrors = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors);
return NULL;
}
} }
/* /*

View File

@ -200,7 +200,7 @@ static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ShardPlacement *placement); ShardPlacement *placement);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement); ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry); static bool HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry);
static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize); static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize); static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
@ -981,18 +981,19 @@ ResetPlacementConnectionManagement(void)
/* /*
* MarkFailedShardPlacements looks through every connection in the connection shard hash * ErrorIfPostCommitFailedShardPlacements throws an error if any of the placements
* and marks the placements associated with failed connections invalid. * that modified the database and involved in the transaction has failed.
* *
* Every shard must have at least one placement connection which did not fail. If all * Note that Citus already fails queries/commands in case of any failures during query
* modifying connections for a shard failed then the transaction will be aborted. * processing. However, there are certain failures that can only be detected on the
* COMMIT time. And, this check mainly ensures to catch errors that happens on the
* COMMIT time on the placements.
* *
* This will be called just before commit, so we can abort before executing remote * The most common example for this case is the deferred errors that are thrown by
* commits. It should also be called after modification statements, to ensure that we * triggers or constraints at the COMMIT time.
* don't run future statements against placements which are not up to date.
*/ */
void void
MarkFailedShardPlacements() ErrorIfPostCommitFailedShardPlacements(void)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL; ConnectionShardHashEntry *shardEntry = NULL;
@ -1000,82 +1001,27 @@ MarkFailedShardPlacements()
hash_seq_init(&status, ConnectionShardHash); hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{ {
if (!CheckShardPlacements(shardEntry)) if (HasModificationFailedForShard(shardEntry))
{ {
ereport(ERROR, ereport(ERROR,
(errmsg("could not make changes to shard " INT64_FORMAT
" on any node",
shardEntry->key.shardId)));
}
}
}
/*
* PostCommitMarkFailedShardPlacements marks placements invalid and checks whether
* sufficiently many placements have failed to abort the entire coordinated
* transaction.
*
* This will be called just after a coordinated commit so we can handle remote
* transactions which failed during commit.
*
* When using2PC is set as least one placement must succeed per shard. If all placements
* fail for a shard the entire transaction is aborted. If using2PC is not set then a only
* a warning will be emitted; we cannot abort because some remote transactions might have
* already been committed.
*/
void
PostCommitMarkFailedShardPlacements(bool using2PC)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
int successes = 0;
int attempts = 0;
int elevel = using2PC ? ERROR : WARNING;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
attempts++;
if (CheckShardPlacements(shardEntry))
{
successes++;
}
else
{
/*
* Only error out if we're using 2PC. If we're not using 2PC we can't error
* out otherwise we can end up with a state where some shard modifications
* have already committed successfully.
*/
ereport(elevel,
(errmsg("could not commit transaction for shard " INT64_FORMAT (errmsg("could not commit transaction for shard " INT64_FORMAT
" on any active node", " on at least one active node", shardEntry->key.shardId)));
shardEntry->key.shardId)));
} }
} }
/*
* If no shards could be modified at all, error out. Doesn't matter whether
* we're post-commit - there's nothing to invalidate.
*/
if (attempts > 0 && successes == 0)
{
ereport(ERROR, (errmsg("could not commit transaction on any active node")));
}
} }
/* /*
* CheckShardPlacements is a helper function for CheckForFailedPlacements that * HasModificationFailedForShard is a helper function for
* performs the per-shard work. * ErrorIfPostCommitFailedShardPlacements that performs the per-shard work.
*
* The function returns true if any placement of the input shard is modified
* and any failures has happened (either connection failures or transaction
* failures).
*/ */
static bool static bool
CheckShardPlacements(ConnectionShardHashEntry *shardEntry) HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry)
{ {
int failures = 0;
int successes = 0;
dlist_iter placementIter; dlist_iter placementIter;
dlist_foreach(placementIter, &shardEntry->placementConnections) dlist_foreach(placementIter, &shardEntry->placementConnections)
@ -1095,51 +1041,11 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
if (!connection || connection->remoteTransaction.transactionFailed) if (!connection || connection->remoteTransaction.transactionFailed)
{ {
placementEntry->failed = true; return true;
failures++;
}
else
{
successes++;
} }
} }
/* return false;
* If there were any failures we want to bail on a commit in two situations:
* there were no successes, or there was a failure with a reference table shard.
* Ideally issues with a reference table will've errored out earlier,
* but if not, we abort now to avoid an unhealthy reference table shard.
*/
if (failures > 0 &&
(successes == 0 || ReferenceTableShardId(shardEntry->key.shardId)))
{
return false;
}
/* mark all failed placements invalid */
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (placementEntry->failed)
{
uint64 shardId = shardEntry->key.shardId;
uint64 placementId = placementEntry->key.placementId;
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
/*
* We only set shard state if it currently is SHARD_STATE_ACTIVE, which
* prevents overwriting shard state if it was already set somewhere else.
*/
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{
MarkShardPlacementInactive(shardPlacement);
}
}
}
return true;
} }

View File

@ -690,7 +690,6 @@ static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExec
bool succeeded); bool succeeded);
static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution * static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *
placementExecution); placementExecution);
static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution);
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution); static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution * static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
shardCommandExecution); shardCommandExecution);
@ -4683,20 +4682,6 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
} }
else else
{ {
if (ShouldMarkPlacementsInvalidOnFailure(execution))
{
ShardPlacement *shardPlacement = placementExecution->shardPlacement;
/*
* We only set shard state if it currently is SHARD_STATE_ACTIVE, which
* prevents overwriting shard state if it was already set somewhere else.
*/
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{
MarkShardPlacementInactive(shardPlacement);
}
}
if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY) if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY)
{ {
/* /*
@ -4877,30 +4862,6 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool
} }
/*
* ShouldMarkPlacementsInvalidOnFailure returns true if the failure
* should trigger marking placements invalid.
*/
static bool
ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution)
{
if (!DistributedExecutionModifiesDatabase(execution) ||
execution->transactionProperties->errorOnAnyFailure)
{
/*
* Failures that do not modify the database (e.g., mainly SELECTs) should
* never lead to invalid placement.
*
* Failures that lead throwing error, no need to mark any placement
* invalid.
*/
return false;
}
return true;
}
/* /*
* PlacementExecutionReady adds a placement execution to the ready queue when * PlacementExecutionReady adds a placement execution to the ready queue when
* its dependent placement executions have finished. * its dependent placement executions have finished.

View File

@ -496,12 +496,6 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
char *intermediateResultIdPrefix) char *intermediateResultIdPrefix)
{ {
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
bool stopOnFailure = false;
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
{
stopOnFailure = true;
}
/* Get column name list and partition column index for the target table */ /* Get column name list and partition column index for the target table */
List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId, List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
@ -514,7 +508,6 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
columnNameList, columnNameList,
partitionColumnIndex, partitionColumnIndex,
executorState, executorState,
stopOnFailure,
intermediateResultIdPrefix); intermediateResultIdPrefix);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
@ -537,12 +530,6 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
PlannedStmt *selectPlan, EState *executorState) PlannedStmt *selectPlan, EState *executorState)
{ {
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
bool stopOnFailure = false;
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
{
stopOnFailure = true;
}
/* Get column name list and partition column index for the target table */ /* Get column name list and partition column index for the target table */
List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId, List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
@ -554,8 +541,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList, columnNameList,
partitionColumnIndex, partitionColumnIndex,
executorState, executorState, NULL);
stopOnFailure, NULL);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

View File

@ -1957,63 +1957,6 @@ DeleteShardPlacementRow(uint64 placementId)
} }
/*
* UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong
* to partitioned table. The function goes over the corresponding placements of its
* partitions, and sets their state to the input shardState.
*/
void
UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState)
{
ShardInterval *parentShardInterval =
LoadShardInterval(parentShardPlacement->shardId);
Oid partitionedTableOid = parentShardInterval->relationId;
/* this function should only be called for partitioned tables */
Assert(PartitionedTable(partitionedTableOid));
List *partitionList = PartitionList(partitionedTableOid);
Oid partitionOid = InvalidOid;
foreach_oid(partitionOid, partitionList)
{
uint64 partitionShardId =
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
ShardPlacement *partitionPlacement =
ShardPlacementOnGroupIncludingOrphanedPlacements(
parentShardPlacement->groupId, partitionShardId);
/* the partition should have a placement with the same group */
Assert(partitionPlacement != NULL);
UpdateShardPlacementState(partitionPlacement->placementId, shardState);
}
}
/*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the
* shard placements as inactive if shardPlacement belongs to a partitioned table.
*/
void
MarkShardPlacementInactive(ShardPlacement *shardPlacement)
{
UpdateShardPlacementState(shardPlacement->placementId, SHARD_STATE_INACTIVE);
/*
* In case the shard belongs to a partitioned table, we make sure to update
* the states of its partitions. Repairing shards already ensures to recreate
* all the partitions.
*/
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (PartitionedTable(shardInterval->relationId))
{
UpdatePartitionShardPlacementStates(shardPlacement, SHARD_STATE_INACTIVE);
}
}
/* /*
* UpdateShardPlacementState sets the shardState for the placement identified * UpdateShardPlacementState sets the shardState for the placement identified
* by placementId. * by placementId.

View File

@ -1081,7 +1081,17 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source
shardPlacementList, shardPlacementList,
targetNodeName, targetNodeName,
targetNodePort); targetNodePort);
if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
/*
* shardStateInactive is a legacy state for a placement. As of Citus 11,
* we never mark any placement as INACTIVE.
*
* Still, we prefer to keep this function/code here, as users may need
* to recover placements that are marked as inactive pre Citus 11.
*
*/
int shardStateInactive = 3;
if (targetPlacement->shardState != shardStateInactive)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("target placement must be in inactive state"))); errmsg("target placement must be in inactive state")));

View File

@ -302,6 +302,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
} }
UseCoordinatedTransaction(); UseCoordinatedTransaction();
Use2PCForCoordinatedTransaction();
/* issue command to append table to each shard placement */ /* issue command to append table to each shard placement */
ShardPlacement *shardPlacement = NULL; ShardPlacement *shardPlacement = NULL;
@ -327,20 +328,11 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
RemoteTransactionBeginIfNecessary(connection); RemoteTransactionBeginIfNecessary(connection);
int executeResult = ExecuteOptionalRemoteCommand(connection, ExecuteCriticalRemoteCommand(connection, workerAppendQuery->data);
workerAppendQuery->data,
&queryResult);
PQclear(queryResult); PQclear(queryResult);
ForgetResults(connection); ForgetResults(connection);
if (executeResult != 0)
{
MarkRemoteTransactionFailed(connection, false);
}
} }
MarkFailedShardPlacements();
/* update shard statistics and get new shard size */ /* update shard statistics and get new shard size */
uint64 newShardSize = UpdateShardStatistics(shardId); uint64 newShardSize = UpdateShardStatistics(shardId);

View File

@ -424,14 +424,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* fails, which can lead to divergence when not using 2PC. * fails, which can lead to divergence when not using 2PC.
*/ */
/*
* Check whether the coordinated transaction is in a state we want
* to persist, or whether we want to error out. This handles the
* case where iteratively executed commands marked all placements
* as invalid.
*/
MarkFailedShardPlacements();
if (ShouldCoordinatedTransactionUse2PC) if (ShouldCoordinatedTransactionUse2PC)
{ {
CoordinatedRemoteTransactionsPrepare(); CoordinatedRemoteTransactionsPrepare();
@ -458,9 +450,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* /*
* Check again whether shards/placement successfully * Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time. * committed. This handles failure at COMMIT time.
*/ */
PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC); ErrorIfPostCommitFailedShardPlacements();
break; break;
} }

View File

@ -108,8 +108,6 @@ typedef struct CitusCopyDestReceiver
/* template for COPY statement to send to workers */ /* template for COPY statement to send to workers */
CopyStmt *copyStatement; CopyStmt *copyStatement;
bool stopOnFailure;
/* /*
* shardId to CopyShardState map. Also used in insert_select_executor.c for * shardId to CopyShardState map. Also used in insert_select_executor.c for
* task pruning. * task pruning.
@ -154,7 +152,6 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList, List *columnNameList,
int partitionColumnIndex, int partitionColumnIndex,
EState *executorState, EState *executorState,
bool stopOnFailure,
char *intermediateResultPrefix); char *intermediateResultPrefix);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);

View File

@ -237,9 +237,6 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
char replicationModel); char replicationModel);
extern void DeletePartitionRow(Oid distributedRelationId); extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement,
char shardState);
extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement);
extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId); extern void DeleteShardPlacementRow(uint64 placementId);

View File

@ -32,8 +32,7 @@ extern void AssignPlacementListToConnection(List *placementAccessList,
MultiConnection *connection); MultiConnection *connection);
extern void ResetPlacementConnectionManagement(void); extern void ResetPlacementConnectionManagement(void);
extern void MarkFailedShardPlacements(void); extern void ErrorIfPostCommitFailedShardPlacements(void);
extern void PostCommitMarkFailedShardPlacements(bool using2PC);
extern void CloseShardPlacementAssociation(struct MultiConnection *connection); extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
extern void ResetShardPlacementAssociation(struct MultiConnection *connection); extern void ResetShardPlacementAssociation(struct MultiConnection *connection);

View File

@ -27,12 +27,14 @@
/* /*
* ShardState represents last known states of shards on a given node. * ShardState represents last known states of shards on a given node.
*
* The numbers assigned per state used for historical reason and should
* not be changed since they correspond to shardstate in pg_dist_placement.
*/ */
typedef enum typedef enum
{ {
SHARD_STATE_INVALID_FIRST = 0, SHARD_STATE_INVALID_FIRST = 0,
SHARD_STATE_ACTIVE = 1, SHARD_STATE_ACTIVE = 1,
SHARD_STATE_INACTIVE = 3,
SHARD_STATE_TO_DELETE = 4, SHARD_STATE_TO_DELETE = 4,
} ShardState; } ShardState;

View File

@ -36,12 +36,10 @@ SELECT citus.mitmproxy('conn.kill()');
(1 row) (1 row)
\COPY test_table FROM stdin delimiter ','; \COPY test_table FROM stdin delimiter ',';
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: COPY test_table, line 1: "1,2" CONTEXT: COPY test_table, line 1: "1,2"
ERROR: could not connect to any active placements
CONTEXT: COPY test_table, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -281,22 +279,10 @@ SELECT citus.mitmproxy('conn.kill()');
(1 row) (1 row)
\COPY test_table_2 FROM stdin delimiter ','; \COPY test_table_2 FROM stdin delimiter ',';
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: COPY test_table_2, line 1: "1,2" CONTEXT: COPY test_table_2, line 1: "1,2"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 2: "3,4"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 3: "6,7"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 5: "9,10"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -311,13 +297,13 @@ SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
logicalrelid | shardid | shardstate logicalrelid | shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------
test_table_2 | 1710004 | 3
test_table_2 | 1710004 | 1 test_table_2 | 1710004 | 1
test_table_2 | 1710005 | 3 test_table_2 | 1710004 | 1
test_table_2 | 1710005 | 1
test_table_2 | 1710005 | 1 test_table_2 | 1710005 | 1
test_table_2 | 1710006 | 3
test_table_2 | 1710006 | 1 test_table_2 | 1710006 | 1
test_table_2 | 1710007 | 3 test_table_2 | 1710006 | 1
test_table_2 | 1710007 | 1
test_table_2 | 1710007 | 1 test_table_2 | 1710007 | 1
(8 rows) (8 rows)

View File

@ -165,7 +165,7 @@ connection not open
connection not open connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
COMMIT; COMMIT;
ERROR: could not make changes to shard xxxxx on any node ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT * FROM artists WHERE id IN (4, 5); SELECT * FROM artists WHERE id IN (4, 5);
id | name id | name
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -255,7 +255,7 @@ connection not open
connection not open connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
COMMIT; COMMIT;
ERROR: could not make changes to shard xxxxx on any node ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT * FROM artists WHERE id IN (4, 5); SELECT * FROM artists WHERE id IN (4, 5);
id | name id | name
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -340,7 +340,7 @@ connection not open
connection not open connection not open
connection not open connection not open
COMMIT; COMMIT;
ERROR: could not make changes to shard xxxxx on any node ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT * FROM artists WHERE id=6; SELECT * FROM artists WHERE id=6;
id | name id | name
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -377,6 +377,35 @@ COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value ERROR: illegal value
-- single row, multi-row INSERTs should also fail
-- with or without transaction blocks on the COMMIT time
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
COMMIT;
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
COMMIT;
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
-- and, rollback should be fine
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
ROLLBACK;
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
ROLLBACK;
\unset VERBOSITY \unset VERBOSITY
-- verify everyhing including delete is rolled back -- verify everyhing including delete is rolled back
SELECT * FROM researchers WHERE lab_id = 6; SELECT * FROM researchers WHERE lab_id = 6;
@ -1195,28 +1224,28 @@ ORDER BY s.logicalrelid, sp.shardstate;
reference_failure_test | 1 | 2 reference_failure_test | 1 | 2
(1 row) (1 row)
-- any failure rollbacks the transaction
BEGIN; BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist ABORT;
-- some placements are invalid before abort -- none of placements are invalid after abort
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport shardid | shardstate | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
1200016 | 3 | localhost | 57637 1200016 | 1 | localhost | 57637
1200016 | 1 | localhost | 57638 1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638 1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638 1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637 1200019 | 1 | localhost | 57637
1200019 | 1 | localhost | 57638 1200019 | 1 | localhost | 57638
(8 rows) (8 rows)
ABORT;
-- verify nothing is inserted -- verify nothing is inserted
SELECT count(*) FROM numbers_hash_failure_test; SELECT count(*) FROM numbers_hash_failure_test;
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
@ -1243,52 +1272,35 @@ ORDER BY shardid, nodeport;
1200019 | 1 | localhost | 57638 1200019 | 1 | localhost | 57638
(8 rows) (8 rows)
-- all failures roll back the transaction
BEGIN; BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
-- check shard states before commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
COMMIT; COMMIT;
-- expect some placements to be market invalid after commit -- expect none of the placements to be market invalid after commit
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport shardid | shardstate | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
1200016 | 3 | localhost | 57637 1200016 | 1 | localhost | 57637
1200016 | 1 | localhost | 57638 1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638 1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638 1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637 1200019 | 1 | localhost | 57637
1200019 | 1 | localhost | 57638 1200019 | 1 | localhost | 57638
(8 rows) (8 rows)
-- verify data is inserted -- verify no data is inserted
SELECT count(*) FROM numbers_hash_failure_test; SELECT count(*) FROM numbers_hash_failure_test;
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
2 0
(1 row) (1 row)
-- break the other node as well -- break the other node as well

View File

@ -343,9 +343,7 @@ INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:xxxxx WARNING: failed to commit transaction on localhost:xxxxx
WARNING: could not commit transaction for shard xxxxx on any active node ERROR: could not commit transaction for shard xxxxx on at least one active node
WARNING: could not commit transaction for shard xxxxx on any active node
ERROR: could not commit transaction on any active node
-- data should NOT be persisted -- data should NOT be persisted
SELECT * FROM objects_mx WHERE id = 2; SELECT * FROM objects_mx WHERE id = 2;
id | name id | name
@ -371,9 +369,7 @@ INSERT INTO labs_mx VALUES (9, 'BAD');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:xxxxx WARNING: failed to commit transaction on localhost:xxxxx
WARNING: could not commit transaction for shard xxxxx on any active node ERROR: could not commit transaction for shard xxxxx on at least one active node
WARNING: could not commit transaction for shard xxxxx on any active node
ERROR: could not commit transaction on any active node
-- data should NOT be persisted -- data should NOT be persisted
SELECT * FROM objects_mx WHERE id = 1; SELECT * FROM objects_mx WHERE id = 1;
id | name id | name
@ -396,9 +392,7 @@ INSERT INTO labs_mx VALUES (9, 'BAD');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:xxxxx WARNING: failed to commit transaction on localhost:xxxxx
WARNING: could not commit transaction for shard xxxxx on any active node ERROR: could not commit transaction for shard xxxxx on at least one active node
WARNING: could not commit transaction for shard xxxxx on any active node
ERROR: could not commit transaction on any active node
-- no data should persists -- no data should persists
SELECT * FROM objects_mx WHERE id = 1; SELECT * FROM objects_mx WHERE id = 1;
id | name id | name

View File

@ -577,7 +577,7 @@ SET session_replication_role = DEFAULT;
ALTER USER test_user WITH nologin; ALTER USER test_user WITH nologin;
\c - test_user - :master_port \c - test_user - :master_port
-- reissue copy -- reissue copy, and it should fail
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
1,1 1,1
2,2 2,2
@ -589,7 +589,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
8,8 8,8
\. \.
-- verify shards in the first worker as marked invalid -- verify shards in the none of the workers as marked invalid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid) FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;

View File

@ -784,29 +784,23 @@ SET session_replication_role = DEFAULT;
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
ALTER USER test_user WITH nologin; ALTER USER test_user WITH nologin;
\c - test_user - :master_port \c - test_user - :master_port
-- reissue copy -- reissue copy, and it should fail
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 1: "1,1" CONTEXT: COPY numbers_hash, line 1: "1,1"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -- verify shards in the none of the workers as marked invalid
CONTEXT: COPY numbers_hash, line 2: "2,2"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 3: "3,3"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 6: "6,6"
-- verify shards in the first worker as marked invalid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid) FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport shardid | shardstate | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
560169 | 3 | localhost | 57637 560169 | 1 | localhost | 57637
560169 | 1 | localhost | 57638 560169 | 1 | localhost | 57638
560170 | 3 | localhost | 57637 560170 | 1 | localhost | 57637
560170 | 1 | localhost | 57638 560170 | 1 | localhost | 57638
560171 | 3 | localhost | 57637 560171 | 1 | localhost | 57637
560171 | 1 | localhost | 57638 560171 | 1 | localhost | 57638
560172 | 3 | localhost | 57637 560172 | 1 | localhost | 57637
560172 | 1 | localhost | 57638 560172 | 1 | localhost | 57638
(8 rows) (8 rows)
@ -828,12 +822,8 @@ SELECT shardid, shardstate, nodename, nodeport
-- since it can not insert into either copies of a shard. shards are expected to -- since it can not insert into either copies of a shard. shards are expected to
-- stay valid since the operation is rolled back. -- stay valid since the operation is rolled back.
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1" CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 2: "2,2"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 3: "3,3"
-- verify shards for numbers_hash_other are still valid -- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether -- since copy has failed altogether
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
@ -841,13 +831,13 @@ SELECT shardid, shardstate, nodename, nodeport
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport; WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport shardid | shardstate | nodename | nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
560174 | 3 | localhost | 57637 560174 | 1 | localhost | 57637
560174 | 1 | localhost | 57638 560174 | 1 | localhost | 57638
560175 | 3 | localhost | 57637 560175 | 1 | localhost | 57637
560175 | 1 | localhost | 57638 560175 | 1 | localhost | 57638
560176 | 1 | localhost | 57637 560176 | 1 | localhost | 57637
560176 | 1 | localhost | 57638 560176 | 1 | localhost | 57638
560177 | 3 | localhost | 57637 560177 | 1 | localhost | 57637
560177 | 1 | localhost | 57638 560177 | 1 | localhost | 57638
(8 rows) (8 rows)

View File

@ -310,6 +310,29 @@ DELETE FROM researchers WHERE lab_id = 6;
30, 6, 'Dennis Ritchie' 30, 6, 'Dennis Ritchie'
\. \.
COMMIT; COMMIT;
-- single row, multi-row INSERTs should also fail
-- with or without transaction blocks on the COMMIT time
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
COMMIT;
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
COMMIT;
-- and, rollback should be fine
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
ROLLBACK;
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
ROLLBACK;
\unset VERBOSITY \unset VERBOSITY
-- verify everyhing including delete is rolled back -- verify everyhing including delete is rolled back
@ -935,20 +958,20 @@ AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate; ORDER BY s.logicalrelid, sp.shardstate;
-- any failure rollbacks the transaction
BEGIN; BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
1,1 1,1
2,2 2,2
\. \.
ABORT;
-- some placements are invalid before abort -- none of placements are invalid after abort
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
ABORT;
-- verify nothing is inserted -- verify nothing is inserted
SELECT count(*) FROM numbers_hash_failure_test; SELECT count(*) FROM numbers_hash_failure_test;
@ -958,27 +981,21 @@ FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
-- all failures roll back the transaction
BEGIN; BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
1,1 1,1
2,2 2,2
\. \.
-- check shard states before commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
COMMIT; COMMIT;
-- expect some placements to be market invalid after commit -- expect none of the placements to be market invalid after commit
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
-- verify data is inserted -- verify no data is inserted
SELECT count(*) FROM numbers_hash_failure_test; SELECT count(*) FROM numbers_hash_failure_test;
-- break the other node as well -- break the other node as well