citus/src/backend/distributed/master/citus_create_restore_point.c

206 lines
5.6 KiB
C

/*-------------------------------------------------------------------------
*
* citus_create_restore_point.c
*
* UDF for creating a consistent restore point across all nodes.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "nodes/pg_list.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)"
/* local functions forward declarations */
static List * OpenConnectionsToAllNodes(void);
static void BlockAllDistributedWrites(void);
static void CreateRemoteRestorePoints(char *restoreName, List *connectionList);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_create_restore_point);
/*
* citus_create_restore_point blocks writes to distributed tables and then
* runs pg_create_restore_point on all nodes. This creates a consistent
* restore point under the assumption that there are no other writers
* than the coordinator.
*/
Datum
citus_create_restore_point(PG_FUNCTION_ARGS)
{
text *restoreNameText = PG_GETARG_TEXT_P(0);
char *restoreNameString = NULL;
XLogRecPtr localRestorePoint = InvalidXLogRecPtr;
List *connectionList = NIL;
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
if (RecoveryInProgress())
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery."))));
}
if (!XLogIsNeeded())
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL level not sufficient for creating a restore point"),
errhint("wal_level must be set to \"replica\" or \"logical\" at server "
"start.")));
}
restoreNameString = text_to_cstring(restoreNameText);
if (strlen(restoreNameString) >= MAXFNAMELEN)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("value too long for restore point (maximum %d characters)",
MAXFNAMELEN - 1)));
}
/* establish connections to all nodes before taking any locks */
connectionList = OpenConnectionsToAllNodes();
/*
* Send a BEGIN to bust through pgbouncer. We won't actually commit since
* that takes time. Instead we just close the connections and roll back,
* which doesn't undo pg_create_restore_point.
*/
RemoteTransactionListBegin(connectionList);
/* DANGER: finish as quickly as possible after this */
BlockAllDistributedWrites();
/* do local restore point first to bail out early if something goes wrong */
localRestorePoint = XLogRestorePoint(restoreNameString);
/* run pg_create_restore_point on all nodes */
CreateRemoteRestorePoints(restoreNameString, connectionList);
PG_RETURN_LSN(localRestorePoint);
}
/*
* OpenConnectionsToAllNodes opens connections to all nodes and returns the list
* of connections.
*/
static List *
OpenConnectionsToAllNodes(void)
{
List *connectionList = NIL;
List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL;
int connectionFlags = FORCE_NEW_CONNECTION;
workerNodeList = ActivePrimaryNodeList();
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
MultiConnection *connection = NULL;
connection = StartNodeConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort);
MarkRemoteTransactionCritical(connection);
connectionList = lappend(connectionList, connection);
}
FinishConnectionListEstablishment(connectionList);
return connectionList;
}
/*
* BlockAllDistributedWrites blocks all modifications to distributed tables
* by taking ShareRowExclusive locks on all distributed tables.
*/
static void
BlockAllDistributedWrites(void)
{
ListCell *distributedTableCell = NULL;
List *distributedTableList = DistributedTableList();
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
LockRelationOid(DistPartitionRelationId(), ExclusiveLock);
foreach(distributedTableCell, distributedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
/* block all modifications */
LockRelationOid(cacheEntry->relationId, ShareRowExclusiveLock);
}
}
/*
* CreateRemoteRestorePoints creates a restore point via each of the
* connections in the list in parallel.
*/
static void
CreateRemoteRestorePoints(char *restoreName, List *connectionList)
{
ListCell *connectionCell = NULL;
int parameterCount = 1;
Oid parameterTypes[1] = { TEXTOID };
const char *parameterValues[1] = { restoreName };
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
int querySent = SendRemoteCommandParams(connection, CREATE_RESTORE_POINT_COMMAND,
parameterCount, parameterTypes,
parameterValues);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
PGresult *result = GetRemoteCommandResult(connection, true);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
CloseConnection(connection);
}
}