Merge pull request #5381 from citusdata/do_not_mark_placements_invalid

Drop support Inactive Shard States
pull/5386/head
Önder Kalacı 2021-10-23 09:04:50 +02:00 committed by GitHub
commit 8664e6873f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 164 additions and 425 deletions

View File

@ -1692,13 +1692,11 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
bool stopOnFailure = true;
DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList,
partitionColumnIndex,
estate, stopOnFailure,
NULL);
estate, NULL);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -20,9 +20,9 @@
* in parallel.
*
* For hash-partitioned tables, if it fails to connect to a worker, the master
* marks the placement for which it was trying to open a connection as inactive,
* similar to the way DML statements are handled. If a failure occurs after
* connecting, the transaction is rolled back on all the workers. Note that,
* rollbacks the distributed transaction, similar to the way DML statements
* are handled. If a failure occurs after connecting, the transaction
* is rolled back on all the workers. Note that,
* in the case of append-partitioned tables, if a fail occurs, immediately
* metadata changes are rolled back on the master node, but shard placements
* are left on the worker nodes.
@ -242,8 +242,7 @@ static void CopyToExistingShards(CopyStmt *copyStatement,
static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag,
Oid relationId);
static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool
stopOnFailure,
ShardConnections *shardConnections,
bool useBinaryCopyFormat);
static List * RemoveOptionFromList(List *optionList, char *optionName);
static bool BinaryOutputFunctionDefined(Oid typeId);
@ -281,12 +280,11 @@ static HTAB * CreateShardStateHash(MemoryContext memoryContext);
static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
MultiConnection *connection);
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure,
HTAB *connectionStateHash,
bool *found, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isColocatedIntermediateResult);
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement,
bool stopOnFailure,
bool colocatedIntermediateResult);
static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash);
static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList,
@ -296,7 +294,7 @@ static List * ConnectionStateListToNode(HTAB *connectionStateHash,
const char *hostname, int32 port);
static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure,
uint64 shardId,
bool canUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult);
@ -461,9 +459,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
List *columnNameList = NIL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
bool stopOnFailure = false;
uint64 processedRowCount = 0;
ErrorContextCallback errorCallback;
@ -509,16 +504,10 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
if (IsCitusTableType(tableId, REFERENCE_TABLE))
{
stopOnFailure = true;
}
/* set up the destination for the COPY */
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
partitionColumnIndex,
executorState,
stopOnFailure, NULL);
executorState, NULL);
DestReceiver *dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor);
@ -611,9 +600,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
FreeExecutorState(executorState);
table_close(distributedRelation, NoLock);
/* mark failed placements as inactive */
MarkFailedShardPlacements();
CHECK_FOR_INTERRUPTS();
if (completionTag != NULL)
@ -856,13 +842,12 @@ RemoveOptionFromList(List *optionList, char *optionName)
/*
* OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and
* starts a COPY transaction if necessary. If a connection cannot be opened,
* then the shard placement is marked as inactive and the COPY continues with the remaining
* shard placements.
* then the transaction is rollbacked.
*/
static void
OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat)
bool useBinaryCopyFormat)
{
int failedPlacementCount = 0;
ListCell *placementCell = NULL;
@ -907,19 +892,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
if (stopOnFailure)
{
ReportConnectionError(connection, ERROR);
}
else
{
const bool raiseErrors = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors);
failedPlacementCount++;
continue;
}
ReportConnectionError(connection, ERROR);
}
/*
@ -954,10 +927,10 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
}
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for reference tables.
* We should just error out and code execution should
* never reach to this point. This is the case for all tables.
*/
Assert(!stopOnFailure || failedPlacementCount == 0);
Assert(failedPlacementCount == 0);
shardConnections->connectionList = connectionList;
@ -1869,14 +1842,13 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
char *schemaName = copyStatement->relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
int64 shardId = CreateEmptyShard(qualifiedName);
bool stopOnFailure = true;
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
OpenCopyConnectionsForNewShards(copyStatement, shardConnections, stopOnFailure,
OpenCopyConnectionsForNewShards(copyStatement, shardConnections,
useBinaryCopyFormat);
return shardId;
@ -2166,7 +2138,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
*/
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState, bool stopOnFailure,
EState *executorState,
char *intermediateResultIdPrefix)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
@ -2184,7 +2156,6 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->columnNameList = columnNameList;
copyDest->partitionColumnIndex = partitionColumnIndex;
copyDest->executorState = executorState;
copyDest->stopOnFailure = stopOnFailure;
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext;
@ -2527,8 +2498,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
bool cachedShardStateFound = false;
bool firstTupleInShard = false;
bool stopOnFailure = copyDest->stopOnFailure;
EState *executorState = copyDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
@ -2548,7 +2517,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
copyDest->connectionStateHash,
stopOnFailure,
&cachedShardStateFound,
copyDest->shouldUseLocalCopy,
copyDest->copyOutState,
@ -3234,7 +3202,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
ReportConnectionError(connection, ERROR);
continue;
}
@ -3242,7 +3210,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
if (!SendRemoteCommand(connection, copyCommand->data))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
ReportConnectionError(connection, ERROR);
continue;
}
@ -3610,7 +3578,7 @@ ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32
*/
static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
HTAB *connectionStateHash, bool *found, bool
shouldUseLocalCopy, CopyOutState copyOutState,
bool isColocatedIntermediateResult)
{
@ -3619,7 +3587,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
if (!*found)
{
InitializeCopyShardState(shardState, connectionStateHash,
shardId, stopOnFailure, shouldUseLocalCopy,
shardId, shouldUseLocalCopy,
copyOutState, isColocatedIntermediateResult);
}
@ -3635,7 +3603,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
static void
InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId,
bool stopOnFailure, bool shouldUseLocalCopy,
bool shouldUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult)
{
@ -3685,7 +3653,7 @@ InitializeCopyShardState(CopyShardState *shardState,
}
MultiConnection *connection =
CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure,
CopyGetPlacementConnection(connectionStateHash, placement,
colocatedIntermediateResult);
if (connection == NULL)
{
@ -3729,10 +3697,10 @@ InitializeCopyShardState(CopyShardState *shardState,
}
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for reference tables.
* We just error out and code execution should never reach to this
* point. This is the case for all tables.
*/
Assert(!stopOnFailure || failedPlacementCount == 0);
Assert(failedPlacementCount == 0);
MemoryContextReset(localContext);
}
@ -3798,7 +3766,7 @@ LogLocalCopyToFileExecution(uint64 shardId)
*/
static MultiConnection *
CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
bool stopOnFailure, bool colocatedIntermediateResult)
bool colocatedIntermediateResult)
{
if (colocatedIntermediateResult)
{
@ -3984,18 +3952,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
if (stopOnFailure)
{
ReportConnectionError(connection, ERROR);
}
else
{
const bool raiseErrors = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors);
return NULL;
}
ReportConnectionError(connection, ERROR);
}
/*

View File

@ -200,7 +200,7 @@ static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ShardPlacement *placement);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
static bool HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry);
static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
@ -981,18 +981,19 @@ ResetPlacementConnectionManagement(void)
/*
* MarkFailedShardPlacements looks through every connection in the connection shard hash
* and marks the placements associated with failed connections invalid.
* ErrorIfPostCommitFailedShardPlacements throws an error if any of the placements
* that modified the database and involved in the transaction has failed.
*
* Every shard must have at least one placement connection which did not fail. If all
* modifying connections for a shard failed then the transaction will be aborted.
* Note that Citus already fails queries/commands in case of any failures during query
* processing. However, there are certain failures that can only be detected on the
* COMMIT time. And, this check mainly ensures to catch errors that happens on the
* COMMIT time on the placements.
*
* This will be called just before commit, so we can abort before executing remote
* commits. It should also be called after modification statements, to ensure that we
* don't run future statements against placements which are not up to date.
* The most common example for this case is the deferred errors that are thrown by
* triggers or constraints at the COMMIT time.
*/
void
MarkFailedShardPlacements()
ErrorIfPostCommitFailedShardPlacements(void)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
@ -1000,82 +1001,27 @@ MarkFailedShardPlacements()
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
if (!CheckShardPlacements(shardEntry))
if (HasModificationFailedForShard(shardEntry))
{
ereport(ERROR,
(errmsg("could not make changes to shard " INT64_FORMAT
" on any node",
shardEntry->key.shardId)));
}
}
}
/*
* PostCommitMarkFailedShardPlacements marks placements invalid and checks whether
* sufficiently many placements have failed to abort the entire coordinated
* transaction.
*
* This will be called just after a coordinated commit so we can handle remote
* transactions which failed during commit.
*
* When using2PC is set as least one placement must succeed per shard. If all placements
* fail for a shard the entire transaction is aborted. If using2PC is not set then a only
* a warning will be emitted; we cannot abort because some remote transactions might have
* already been committed.
*/
void
PostCommitMarkFailedShardPlacements(bool using2PC)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
int successes = 0;
int attempts = 0;
int elevel = using2PC ? ERROR : WARNING;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
attempts++;
if (CheckShardPlacements(shardEntry))
{
successes++;
}
else
{
/*
* Only error out if we're using 2PC. If we're not using 2PC we can't error
* out otherwise we can end up with a state where some shard modifications
* have already committed successfully.
*/
ereport(elevel,
(errmsg("could not commit transaction for shard " INT64_FORMAT
" on any active node",
shardEntry->key.shardId)));
" on at least one active node", shardEntry->key.shardId)));
}
}
/*
* If no shards could be modified at all, error out. Doesn't matter whether
* we're post-commit - there's nothing to invalidate.
*/
if (attempts > 0 && successes == 0)
{
ereport(ERROR, (errmsg("could not commit transaction on any active node")));
}
}
/*
* CheckShardPlacements is a helper function for CheckForFailedPlacements that
* performs the per-shard work.
* HasModificationFailedForShard is a helper function for
* ErrorIfPostCommitFailedShardPlacements that performs the per-shard work.
*
* The function returns true if any placement of the input shard is modified
* and any failures has happened (either connection failures or transaction
* failures).
*/
static bool
CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry)
{
int failures = 0;
int successes = 0;
dlist_iter placementIter;
dlist_foreach(placementIter, &shardEntry->placementConnections)
@ -1095,51 +1041,11 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
if (!connection || connection->remoteTransaction.transactionFailed)
{
placementEntry->failed = true;
failures++;
}
else
{
successes++;
return true;
}
}
/*
* If there were any failures we want to bail on a commit in two situations:
* there were no successes, or there was a failure with a reference table shard.
* Ideally issues with a reference table will've errored out earlier,
* but if not, we abort now to avoid an unhealthy reference table shard.
*/
if (failures > 0 &&
(successes == 0 || ReferenceTableShardId(shardEntry->key.shardId)))
{
return false;
}
/* mark all failed placements invalid */
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (placementEntry->failed)
{
uint64 shardId = shardEntry->key.shardId;
uint64 placementId = placementEntry->key.placementId;
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
/*
* We only set shard state if it currently is SHARD_STATE_ACTIVE, which
* prevents overwriting shard state if it was already set somewhere else.
*/
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{
MarkShardPlacementInactive(shardPlacement);
}
}
}
return true;
return false;
}

View File

@ -690,7 +690,6 @@ static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExec
bool succeeded);
static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *
placementExecution);
static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution);
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
shardCommandExecution);
@ -4683,20 +4682,6 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
}
else
{
if (ShouldMarkPlacementsInvalidOnFailure(execution))
{
ShardPlacement *shardPlacement = placementExecution->shardPlacement;
/*
* We only set shard state if it currently is SHARD_STATE_ACTIVE, which
* prevents overwriting shard state if it was already set somewhere else.
*/
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{
MarkShardPlacementInactive(shardPlacement);
}
}
if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY)
{
/*
@ -4877,30 +4862,6 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool
}
/*
* ShouldMarkPlacementsInvalidOnFailure returns true if the failure
* should trigger marking placements invalid.
*/
static bool
ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution)
{
if (!DistributedExecutionModifiesDatabase(execution) ||
execution->transactionProperties->errorOnAnyFailure)
{
/*
* Failures that do not modify the database (e.g., mainly SELECTs) should
* never lead to invalid placement.
*
* Failures that lead throwing error, no need to mark any placement
* invalid.
*/
return false;
}
return true;
}
/*
* PlacementExecutionReady adds a placement execution to the ready queue when
* its dependent placement executions have finished.

View File

@ -496,12 +496,6 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
char *intermediateResultIdPrefix)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
bool stopOnFailure = false;
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
{
stopOnFailure = true;
}
/* Get column name list and partition column index for the target table */
List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
@ -514,7 +508,6 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
stopOnFailure,
intermediateResultIdPrefix);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
@ -537,12 +530,6 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
PlannedStmt *selectPlan, EState *executorState)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
bool stopOnFailure = false;
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
{
stopOnFailure = true;
}
/* Get column name list and partition column index for the target table */
List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId,
@ -554,8 +541,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
stopOnFailure, NULL);
executorState, NULL);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

View File

@ -1957,63 +1957,6 @@ DeleteShardPlacementRow(uint64 placementId)
}
/*
* UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong
* to partitioned table. The function goes over the corresponding placements of its
* partitions, and sets their state to the input shardState.
*/
void
UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState)
{
ShardInterval *parentShardInterval =
LoadShardInterval(parentShardPlacement->shardId);
Oid partitionedTableOid = parentShardInterval->relationId;
/* this function should only be called for partitioned tables */
Assert(PartitionedTable(partitionedTableOid));
List *partitionList = PartitionList(partitionedTableOid);
Oid partitionOid = InvalidOid;
foreach_oid(partitionOid, partitionList)
{
uint64 partitionShardId =
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
ShardPlacement *partitionPlacement =
ShardPlacementOnGroupIncludingOrphanedPlacements(
parentShardPlacement->groupId, partitionShardId);
/* the partition should have a placement with the same group */
Assert(partitionPlacement != NULL);
UpdateShardPlacementState(partitionPlacement->placementId, shardState);
}
}
/*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the
* shard placements as inactive if shardPlacement belongs to a partitioned table.
*/
void
MarkShardPlacementInactive(ShardPlacement *shardPlacement)
{
UpdateShardPlacementState(shardPlacement->placementId, SHARD_STATE_INACTIVE);
/*
* In case the shard belongs to a partitioned table, we make sure to update
* the states of its partitions. Repairing shards already ensures to recreate
* all the partitions.
*/
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (PartitionedTable(shardInterval->relationId))
{
UpdatePartitionShardPlacementStates(shardPlacement, SHARD_STATE_INACTIVE);
}
}
/*
* UpdateShardPlacementState sets the shardState for the placement identified
* by placementId.

View File

@ -1081,7 +1081,17 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source
shardPlacementList,
targetNodeName,
targetNodePort);
if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
/*
* shardStateInactive is a legacy state for a placement. As of Citus 11,
* we never mark any placement as INACTIVE.
*
* Still, we prefer to keep this function/code here, as users may need
* to recover placements that are marked as inactive pre Citus 11.
*
*/
int shardStateInactive = 3;
if (targetPlacement->shardState != shardStateInactive)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("target placement must be in inactive state")));

View File

@ -302,6 +302,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
}
UseCoordinatedTransaction();
Use2PCForCoordinatedTransaction();
/* issue command to append table to each shard placement */
ShardPlacement *shardPlacement = NULL;
@ -327,20 +328,11 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
RemoteTransactionBeginIfNecessary(connection);
int executeResult = ExecuteOptionalRemoteCommand(connection,
workerAppendQuery->data,
&queryResult);
ExecuteCriticalRemoteCommand(connection, workerAppendQuery->data);
PQclear(queryResult);
ForgetResults(connection);
if (executeResult != 0)
{
MarkRemoteTransactionFailed(connection, false);
}
}
MarkFailedShardPlacements();
/* update shard statistics and get new shard size */
uint64 newShardSize = UpdateShardStatistics(shardId);

View File

@ -424,14 +424,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* fails, which can lead to divergence when not using 2PC.
*/
/*
* Check whether the coordinated transaction is in a state we want
* to persist, or whether we want to error out. This handles the
* case where iteratively executed commands marked all placements
* as invalid.
*/
MarkFailedShardPlacements();
if (ShouldCoordinatedTransactionUse2PC)
{
CoordinatedRemoteTransactionsPrepare();
@ -458,9 +450,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/*
* Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time.
* committed. This handles failure at COMMIT time.
*/
PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC);
ErrorIfPostCommitFailedShardPlacements();
break;
}

View File

@ -108,8 +108,6 @@ typedef struct CitusCopyDestReceiver
/* template for COPY statement to send to workers */
CopyStmt *copyStatement;
bool stopOnFailure;
/*
* shardId to CopyShardState map. Also used in insert_select_executor.c for
* task pruning.
@ -154,7 +152,6 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList,
int partitionColumnIndex,
EState *executorState,
bool stopOnFailure,
char *intermediateResultPrefix);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);

View File

@ -237,9 +237,6 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
char replicationModel);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement,
char shardState);
extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId);

View File

@ -32,8 +32,7 @@ extern void AssignPlacementListToConnection(List *placementAccessList,
MultiConnection *connection);
extern void ResetPlacementConnectionManagement(void);
extern void MarkFailedShardPlacements(void);
extern void PostCommitMarkFailedShardPlacements(bool using2PC);
extern void ErrorIfPostCommitFailedShardPlacements(void);
extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
extern void ResetShardPlacementAssociation(struct MultiConnection *connection);

View File

@ -27,12 +27,14 @@
/*
* ShardState represents last known states of shards on a given node.
*
* The numbers assigned per state used for historical reason and should
* not be changed since they correspond to shardstate in pg_dist_placement.
*/
typedef enum
{
SHARD_STATE_INVALID_FIRST = 0,
SHARD_STATE_ACTIVE = 1,
SHARD_STATE_INACTIVE = 3,
SHARD_STATE_TO_DELETE = 4,
} ShardState;

View File

@ -36,12 +36,10 @@ SELECT citus.mitmproxy('conn.kill()');
(1 row)
\COPY test_table FROM stdin delimiter ',';
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table, line 1: "1,2"
ERROR: could not connect to any active placements
CONTEXT: COPY test_table, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
@ -281,22 +279,10 @@ SELECT citus.mitmproxy('conn.kill()');
(1 row)
\COPY test_table_2 FROM stdin delimiter ',';
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 1: "1,2"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 2: "3,4"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 3: "6,7"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table_2, line 5: "9,10"
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
@ -311,13 +297,13 @@ SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
ORDER BY shardid, nodeport;
logicalrelid | shardid | shardstate
---------------------------------------------------------------------
test_table_2 | 1710004 | 3
test_table_2 | 1710004 | 1
test_table_2 | 1710005 | 3
test_table_2 | 1710004 | 1
test_table_2 | 1710005 | 1
test_table_2 | 1710005 | 1
test_table_2 | 1710006 | 3
test_table_2 | 1710006 | 1
test_table_2 | 1710007 | 3
test_table_2 | 1710006 | 1
test_table_2 | 1710007 | 1
test_table_2 | 1710007 | 1
(8 rows)

View File

@ -165,7 +165,7 @@ connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
COMMIT;
ERROR: could not make changes to shard xxxxx on any node
ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT * FROM artists WHERE id IN (4, 5);
id | name
---------------------------------------------------------------------
@ -255,7 +255,7 @@ connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
COMMIT;
ERROR: could not make changes to shard xxxxx on any node
ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT * FROM artists WHERE id IN (4, 5);
id | name
---------------------------------------------------------------------
@ -340,7 +340,7 @@ connection not open
connection not open
connection not open
COMMIT;
ERROR: could not make changes to shard xxxxx on any node
ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT * FROM artists WHERE id=6;
id | name
---------------------------------------------------------------------

View File

@ -377,6 +377,35 @@ COMMIT;
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
-- single row, multi-row INSERTs should also fail
-- with or without transaction blocks on the COMMIT time
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
COMMIT;
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
COMMIT;
WARNING: illegal value
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: illegal value
-- and, rollback should be fine
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
ROLLBACK;
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
ROLLBACK;
\unset VERBOSITY
-- verify everyhing including delete is rolled back
SELECT * FROM researchers WHERE lab_id = 6;
@ -1195,28 +1224,28 @@ ORDER BY s.logicalrelid, sp.shardstate;
reference_failure_test | 1 | 2
(1 row)
-- any failure rollbacks the transaction
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
-- some placements are invalid before abort
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
ABORT;
-- none of placements are invalid after abort
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
ABORT;
-- verify nothing is inserted
SELECT count(*) FROM numbers_hash_failure_test;
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
@ -1243,52 +1272,35 @@ ORDER BY shardid, nodeport;
1200019 | 1 | localhost | 57638
(8 rows)
-- all failures roll back the transaction
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
-- check shard states before commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
COMMIT;
-- expect some placements to be market invalid after commit
-- expect none of the placements to be market invalid after commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
-- verify data is inserted
-- verify no data is inserted
SELECT count(*) FROM numbers_hash_failure_test;
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
count
---------------------------------------------------------------------
2
0
(1 row)
-- break the other node as well

View File

@ -343,9 +343,7 @@ INSERT INTO labs_mx VALUES (9, 'Umbrella Corporation');
COMMIT;
WARNING: illegal value
WARNING: failed to commit transaction on localhost:xxxxx
WARNING: could not commit transaction for shard xxxxx on any active node
WARNING: could not commit transaction for shard xxxxx on any active node
ERROR: could not commit transaction on any active node
ERROR: could not commit transaction for shard xxxxx on at least one active node
-- data should NOT be persisted
SELECT * FROM objects_mx WHERE id = 2;
id | name
@ -371,9 +369,7 @@ INSERT INTO labs_mx VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
WARNING: failed to commit transaction on localhost:xxxxx
WARNING: could not commit transaction for shard xxxxx on any active node
WARNING: could not commit transaction for shard xxxxx on any active node
ERROR: could not commit transaction on any active node
ERROR: could not commit transaction for shard xxxxx on at least one active node
-- data should NOT be persisted
SELECT * FROM objects_mx WHERE id = 1;
id | name
@ -396,9 +392,7 @@ INSERT INTO labs_mx VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
WARNING: failed to commit transaction on localhost:xxxxx
WARNING: could not commit transaction for shard xxxxx on any active node
WARNING: could not commit transaction for shard xxxxx on any active node
ERROR: could not commit transaction on any active node
ERROR: could not commit transaction for shard xxxxx on at least one active node
-- no data should persists
SELECT * FROM objects_mx WHERE id = 1;
id | name

View File

@ -577,7 +577,7 @@ SET session_replication_role = DEFAULT;
ALTER USER test_user WITH nologin;
\c - test_user - :master_port
-- reissue copy
-- reissue copy, and it should fail
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
@ -589,7 +589,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
8,8
\.
-- verify shards in the first worker as marked invalid
-- verify shards in the none of the workers as marked invalid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;

View File

@ -784,29 +784,23 @@ SET session_replication_role = DEFAULT;
\c - :default_user - :worker_1_port
ALTER USER test_user WITH nologin;
\c - test_user - :master_port
-- reissue copy
-- reissue copy, and it should fail
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 1: "1,1"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 2: "2,2"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 3: "3,3"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 6: "6,6"
-- verify shards in the first worker as marked invalid
-- verify shards in the none of the workers as marked invalid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
560169 | 3 | localhost | 57637
560169 | 1 | localhost | 57637
560169 | 1 | localhost | 57638
560170 | 3 | localhost | 57637
560170 | 1 | localhost | 57637
560170 | 1 | localhost | 57638
560171 | 3 | localhost | 57637
560171 | 1 | localhost | 57637
560171 | 1 | localhost | 57638
560172 | 3 | localhost | 57637
560172 | 1 | localhost | 57637
560172 | 1 | localhost | 57638
(8 rows)
@ -828,12 +822,8 @@ SELECT shardid, shardstate, nodename, nodeport
-- since it can not insert into either copies of a shard. shards are expected to
-- stay valid since the operation is rolled back.
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 2: "2,2"
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 3: "3,3"
-- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether
SELECT shardid, shardstate, nodename, nodeport
@ -841,13 +831,13 @@ SELECT shardid, shardstate, nodename, nodeport
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
560174 | 3 | localhost | 57637
560174 | 1 | localhost | 57637
560174 | 1 | localhost | 57638
560175 | 3 | localhost | 57637
560175 | 1 | localhost | 57637
560175 | 1 | localhost | 57638
560176 | 1 | localhost | 57637
560176 | 1 | localhost | 57638
560177 | 3 | localhost | 57637
560177 | 1 | localhost | 57637
560177 | 1 | localhost | 57638
(8 rows)

View File

@ -310,6 +310,29 @@ DELETE FROM researchers WHERE lab_id = 6;
30, 6, 'Dennis Ritchie'
\.
COMMIT;
-- single row, multi-row INSERTs should also fail
-- with or without transaction blocks on the COMMIT time
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
COMMIT;
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
COMMIT;
-- and, rollback should be fine
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup');
ROLLBACK;
BEGIN;
INSERT INTO researchers VALUES (31, 6, 'Bjarne Stroustrup'), (32, 7, 'Bjarne Stroustrup');
ROLLBACK;
\unset VERBOSITY
-- verify everyhing including delete is rolled back
@ -935,20 +958,20 @@ AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- any failure rollbacks the transaction
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
\.
ABORT;
-- some placements are invalid before abort
-- none of placements are invalid after abort
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
ABORT;
-- verify nothing is inserted
SELECT count(*) FROM numbers_hash_failure_test;
@ -958,27 +981,21 @@ FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
-- all failures roll back the transaction
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
\.
-- check shard states before commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
COMMIT;
-- expect some placements to be market invalid after commit
-- expect none of the placements to be market invalid after commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
-- verify data is inserted
-- verify no data is inserted
SELECT count(*) FROM numbers_hash_failure_test;
-- break the other node as well