mirror of https://github.com/citusdata/citus.git
Fix dropping replication slot (#6359)
DESCRIPTION: Fixes dropping replication slots
As detected by a flaky test, Citus sometimes fails to drop replication
slots, possibly due to a race condition, at the end of a shard split.
With this PR, we retry to drop them in case of an `OBJECT_IN_USE` error,
consistently for 20 seconds.
fixes: #6326
(cherry picked from commit bae4b47c2f
)
revert-shard-rebalancer-changes
parent
a8e7c2cb09
commit
ecaa0cda6d
|
@ -66,6 +66,7 @@
|
|||
#include "utils/syscache.h"
|
||||
|
||||
#define STR_ERRCODE_UNDEFINED_OBJECT "42704"
|
||||
#define STR_ERRCODE_OBJECT_IN_USE "55006"
|
||||
|
||||
|
||||
#define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots"
|
||||
|
@ -1281,18 +1282,64 @@ DropPublications(MultiConnection *sourceConnection, HTAB *publicationInfoHash)
|
|||
|
||||
/*
|
||||
* DropReplicationSlot drops the replication slot with the given name
|
||||
* if it exists.
|
||||
* if it exists. It retries if the command fails with an OBJECT_IN_USE error.
|
||||
*/
|
||||
static void
|
||||
DropReplicationSlot(MultiConnection *connection, char *replicationSlotName)
|
||||
{
|
||||
ExecuteCriticalRemoteCommand(
|
||||
connection,
|
||||
psprintf(
|
||||
"select pg_drop_replication_slot(slot_name) from "
|
||||
REPLICATION_SLOT_CATALOG_TABLE_NAME
|
||||
" where slot_name = %s",
|
||||
quote_literal_cstr(replicationSlotName)));
|
||||
int maxSecondsToTryDropping = 20;
|
||||
bool raiseInterrupts = true;
|
||||
PGresult *result = NULL;
|
||||
|
||||
/* we'll retry in case of an OBJECT_IN_USE error */
|
||||
while (maxSecondsToTryDropping >= 0)
|
||||
{
|
||||
int querySent = SendRemoteCommand(
|
||||
connection,
|
||||
psprintf(
|
||||
"select pg_drop_replication_slot(slot_name) from "
|
||||
REPLICATION_SLOT_CATALOG_TABLE_NAME
|
||||
" where slot_name = %s",
|
||||
quote_literal_cstr(replicationSlotName))
|
||||
);
|
||||
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(connection, ERROR);
|
||||
}
|
||||
|
||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
|
||||
if (IsResponseOK(result))
|
||||
{
|
||||
/* no error, we are good to go */
|
||||
break;
|
||||
}
|
||||
|
||||
char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_OBJECT_IN_USE) == 0 &&
|
||||
maxSecondsToTryDropping > 0)
|
||||
{
|
||||
/* retry dropping the replication slot after sleeping for one sec */
|
||||
maxSecondsToTryDropping--;
|
||||
pg_usleep(1000);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Report error if:
|
||||
* - Error code is not 55006 (Object In Use)
|
||||
* - Or, we have made enough number of retries (currently 20), but didn't work
|
||||
*/
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue