Add a UDF to use a distributed snapshot

marcocitus/distributed-snapshot-isolation-v0
Marco Slot 2022-11-11 15:58:10 +01:00
parent 7358b826ef
commit 21b0500ad2
17 changed files with 462 additions and 30 deletions

View File

@ -678,6 +678,20 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
}
/*
* CloseConnectionList closes all connections in the given list.
*/
void
CloseConnectionList(List *connectionList)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
}
/*
* Close a previously established connection.
*/

View File

@ -424,6 +424,25 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)
}
/*
* SendCommandToConnectionList sends a given command over all connections
* in the list in parallel.
*/
void
SendRemoteCommandToConnectionList(List *connectionList, const char *command)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
int querySent = SendRemoteCommand(connection, command);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
}
/*
* ExecuteRemoteCommandInConnectionList executes a remote command, on all connections
* given in the list, that is critical to the transaction. If the command fails then
@ -434,15 +453,7 @@ ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *comma
{
MultiConnection *connection = NULL;
foreach_ptr(connection, nodeConnectionList)
{
int querySent = SendRemoteCommand(connection, command);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
SendRemoteCommandToConnectionList(nodeConnectionList, command);
/* Process the result */
foreach_ptr(connection, nodeConnectionList)

View File

@ -111,7 +111,7 @@ citus_create_restore_point(PG_FUNCTION_ARGS)
* OpenConnectionsToAllNodes opens connections to all nodes and returns the list
* of connections.
*/
static List *
List *
OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode)
{
List *connectionList = NIL;

View File

@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_get_transaction_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"
#include "udfs/citus_use_snapshot/11.1-1.sql"

View File

@ -1,6 +1,7 @@
-- citus--11.2-1--11.1-1
#include "../udfs/get_rebalance_progress/11.1-1.sql"
#include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_use_snapshot();
DROP FUNCTION pg_catalog.citus_get_node_clock();
DROP FUNCTION pg_catalog.citus_get_transaction_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_use_snapshot()
RETURNS void
LANGUAGE c
STRICT
AS '$libdir/citus', $function$citus_use_snapshot$function$;
COMMENT ON FUNCTION pg_catalog.citus_use_snapshot()
IS 'use a consistent a consistent distributed snapshot for the remainder of the transaction';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_use_snapshot() TO PUBLIC;

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_use_snapshot()
RETURNS void
LANGUAGE c
STRICT
AS '$libdir/citus', $function$citus_use_snapshot$function$;
COMMENT ON FUNCTION pg_catalog.citus_use_snapshot()
IS 'use a consistent a consistent distributed snapshot for the remainder of the transaction';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_use_snapshot() TO PUBLIC;

View File

@ -28,6 +28,7 @@
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/transaction/snapshot.h"
#include "distributed/transaction_identifier.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
@ -85,14 +86,22 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
/*
* Explicitly specify READ COMMITTED, the default on the remote
* side might have been changed, and that would cause problematic
* behaviour.
*/
/* append the BEGIN command */
appendStringInfoString(beginAndSetDistributedTransactionId,
BeginTransactionCommand());
/* when using a distributed snapshot, append SET TRANSACTION SNAPSHOT .. */
char *exportedSnapshotName = GetSnapshotNameForNode(connection->hostname,
connection->port,
connection->user,
connection->database);
if (exportedSnapshotName != NULL)
{
appendStringInfo(beginAndSetDistributedTransactionId,
"SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(exportedSnapshotName));
}
/* append context for in-progress SAVEPOINTs for this transaction */
List *activeSubXacts = ActiveSubXactContexts();
transaction->lastSuccessfulSubXact = TopSubTransactionId;

View File

@ -0,0 +1,302 @@
/*-------------------------------------------------------------------------
*
* snapshot.c
* Basic distributed snapshot isolation implementation
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/transaction/snapshot.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "storage/lockdefs.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
/* locking in share mode conflicts with 2PC commit, but not with other snapshots */
#define BLOCK_2PC_COMMAND "LOCK pg_catalog.pg_dist_transaction IN SHARE MODE"
#define EXPORT_SNAPSHOT_COMMAND "SELECT pg_catalog.pg_export_snapshot()"
/*
* ExportedSnapshotEntry represents an entry in the ExportSnapshotsHash hash.
*/
typedef struct ExportedSnapshotEntry
{
/*
* We reuse the full connection key instead of node ID primarily for
* ease-of-integration with connection and transaction management APIs.
*/
ConnectionHashKey key;
/* name of the exported snapshot (file) */
char snapshotName[MAXPGPATH];
} ExportedSnapshotEntry;
PG_FUNCTION_INFO_V1(citus_use_snapshot);
static void UseDistributedSnapshot(void);
static HTAB * CreateExportSnapshotsHash(void);
/*
* ExportSnapshotsHash contains the node ID to snapshot name mapping
* for the current transaction.
*/
static HTAB *ExportSnapshotsHash = NULL;
/*
* citus_use_snapshot creates a distributed snapshot and uses it for the
* remainder of the transaction.
*/
Datum
citus_use_snapshot(PG_FUNCTION_ARGS)
{
UseDistributedSnapshot();
PG_RETURN_VOID();
}
/*
* UseDistributedSnapshot briefly locks 2PCs across the cluster, thenn exports
* snapshots on all nodes, and releases the locks. The exports snapshots are
* subsequently used in all worker node transactions for the remainder of the
* transaction.
*
* The net effect is that ongoing 2PCs are either visible in all snapshots
* or not yet visible.
*/
static void
UseDistributedSnapshot(void)
{
if (ExportSnapshotsHash != NULL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("citus_use_snapshot can only be called once per "
"transaction")));
}
if (!IsolationUsesXactSnapshot())
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("citus_use_snapshot can only be used in a transaction "
"with isolation level SERIALIZABLE or REPEATABLE READ")));
}
if (GetTopTransactionIdIfAny() != InvalidTransactionId || IsSubTransaction() ||
!dlist_is_empty(&InProgressTransactions))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("citus_use_snapshot must be called before any query")));
}
HTAB *snapshotsHash = CreateExportSnapshotsHash();
/* we definitely want a distributed transaction when asking for a snapshot */
UseCoordinatedTransaction();
/*
* Prepare a set of regular connections for exporting the snapshot. These can
* be reused for other purposes by the current user.
*/
int connectionFlags = 0;
List *snapshotConnections = GetConnectionsToTargetNodeSet(connectionFlags,
OTHER_PRIMARY_NODES,
NoLock,
CurrentUserName());
/*
* Prepare a set of new connections to lock pg_dist_transaction as superuser.
*
* We open connections to other nodes using superuser to have lock privileges
* on pg_dist_transaction. We immediately close these connections to release
* the lock and free up the resources.
*
* We do not take specific locks on pg_dist_node to avoid deadlock opportunities.
* If a new node gets added concurrently, it seems unlikely that shards will arrive
* there before we get our snapshot.
*/
List *blockingConnections = GetConnectionsToTargetNodeSet(FORCE_NEW_CONNECTION,
OTHER_PRIMARY_NODES,
NoLock,
CitusExtensionOwnerName());
/* lock pg_dist_transaction locally to prevent concurrent 2PC commits */
LockRelationOid(DistTransactionRelationId(), ShareLock);
/* lock pg_dist_transaction on all other nodes to prevent concurrent 2PC commits */
RemoteTransactionListBegin(blockingConnections);
ExecuteRemoteCommandInConnectionList(blockingConnections, BLOCK_2PC_COMMAND);
/* export a snapshot on the current node */
char *localSnapshotName = ExportSnapshot(GetActiveSnapshot());
/* create the hash key for the local node */
WorkerNode *localNode = LookupNodeByNodeIdOrError(GetLocalNodeId());
ConnectionHashKey localNodeKey;
memset_struct_0(localNodeKey);
strlcpy(localNodeKey.hostname, localNode->workerName, MAX_NODE_LENGTH);
localNodeKey.port = localNode->workerPort;
strlcpy(localNodeKey.user, CurrentUserName(), NAMEDATALEN);
strlcpy(localNodeKey.database, CurrentDatabaseName(), NAMEDATALEN);
localNodeKey.replicationConnParam = false;
/* add the snapshot name of the current node to the hash */
bool isFound = false;
ExportedSnapshotEntry *localSnapshotEntry =
hash_search(snapshotsHash, &localNodeKey, HASH_ENTER, &isFound);
strlcpy(localSnapshotEntry->snapshotName, localSnapshotName, MAXPGPATH);
/*
* Now export a snapshot on other nodes. The current isolation level
* is at least REPATABLE READ (enforced above), which will be propagated
* to other nodes as part of the BEGIN.
*/
RemoteTransactionListBegin(snapshotConnections);
SendRemoteCommandToConnectionList(snapshotConnections, EXPORT_SNAPSHOT_COMMAND);
MultiConnection *connection = NULL;
foreach_ptr(connection, snapshotConnections)
{
/* create the hash key for the remote node */
ConnectionHashKey remoteNodeKey;
memset_struct_0(remoteNodeKey);
strlcpy(remoteNodeKey.hostname, connection->hostname, MAX_NODE_LENGTH);
remoteNodeKey.port = connection->port;
strlcpy(remoteNodeKey.user, connection->user, NAMEDATALEN);
strlcpy(remoteNodeKey.database, connection->database, NAMEDATALEN);
remoteNodeKey.replicationConnParam = false;
/*
* Create the hash entry before GetRemoteCommandResult to avoid allocations
* (OOM possibilities) before PQclear.
*/
ExportedSnapshotEntry *remoteSnapshotEntry =
hash_search(snapshotsHash, &remoteNodeKey, HASH_ENTER, &isFound);
Assert(!isFound);
bool raiseErrors = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(queryResult))
{
ReportResultError(connection, queryResult, ERROR);
}
if (PQntuples(queryResult) != 1 || PQnfields(queryResult) != 1)
{
PQclear(queryResult);
ereport(ERROR, (errmsg("unexpected result for: %s",
EXPORT_SNAPSHOT_COMMAND)));
}
char *exportedSnapshotName = PQgetvalue(queryResult, 0, 0);
/* first copy into stack (avoid allocations / OOM risk) */
strlcpy(remoteSnapshotEntry->snapshotName, exportedSnapshotName, MAXPGPATH);
PQclear(queryResult);
ClearResults(connection, raiseErrors);
}
/* unblock 2PCs by closing the connections holding locks */
CloseConnectionList(blockingConnections);
/* start using the snapshot */
ExportSnapshotsHash = snapshotsHash;
}
/*
* CreateExportSnapshotsHash creates the node ID to exported snapshot name
* mapping.
*/
static HTAB *
CreateExportSnapshotsHash(void)
{
HASHCTL info;
memset_struct_0(info);
info.keysize = sizeof(ConnectionHashKey);
info.entrysize = sizeof(ExportedSnapshotEntry);
/*
* We may use the snapshot throughout the lifetime of the transaction,
* hence we use TopTransactionContext.
*
* (CreateSimpleHash et al. use CurrentMemoryContext)
*/
info.hcxt = TopTransactionContext;
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
HTAB *exportedSnapshotsHash = hash_create("exported snapshot names", 32, &info,
hashFlags);
return exportedSnapshotsHash;
}
/*
* GetSnapshotNameForNode returns the snapshot name for the given node or NULL
* if no snapshot was exported.
*/
char *
GetSnapshotNameForNode(char *hostname, int port, char *userName, char *databaseName)
{
if (ExportSnapshotsHash == NULL)
{
return NULL;
}
ConnectionHashKey nodeKey;
memset_struct_0(nodeKey);
strlcpy(nodeKey.hostname, hostname, MAX_NODE_LENGTH);
nodeKey.port = port;
strlcpy(nodeKey.user, userName, NAMEDATALEN);
strlcpy(nodeKey.database, databaseName, NAMEDATALEN);
nodeKey.replicationConnParam = false;
bool isFound = false;
ExportedSnapshotEntry *remoteSnapshotEntry =
hash_search(ExportSnapshotsHash, &nodeKey, HASH_FIND, &isFound);
if (!isFound)
{
return NULL;
}
/* safe to return since it is allocated until the end of the transaction */
return remoteSnapshotEntry->snapshotName;
}
/*
* ResetExportedSnapshots resets the hash of exported snapshots.
*
* We do not bother freeing memory, since it is allocated in
* TopTransactionContext and will get reset at the end of the
* transaction.
*/
void
ResetExportedSnapshots(void)
{
ExportSnapshotsHash = NULL;
}

View File

@ -34,6 +34,7 @@
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_explain.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/transaction/snapshot.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
@ -618,6 +619,7 @@ ResetGlobalVariables()
BeginXactReadOnly = BeginXactReadOnly_NotSet;
BeginXactDeferrable = BeginXactDeferrable_NotSet;
ResetWorkerErrorIndication();
ResetExportedSnapshots();
memset(&AllowedDistributionColumnValue, 0,
sizeof(AllowedDistributionColumn));
}

View File

@ -142,27 +142,32 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = NIL;
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}
else
{
workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
}
List *workerNodeList = ActivePrimaryNodeList(lockMode);
List *result = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet ==
METADATA_NODES) &&
if ((targetWorkerSet == NON_COORDINATOR_NODES ||
targetWorkerSet == NON_COORDINATOR_METADATA_NODES) &&
workerNode->groupId == COORDINATOR_GROUP_ID)
{
continue;
}
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
targetWorkerSet == METADATA_NODES) &&
!workerNode->hasMetadata)
{
continue;
}
if (targetWorkerSet == OTHER_PRIMARY_NODES &&
workerNode->groupId == GetLocalGroupId())
{
continue;
}
result = lappend(result, workerNode);
}
@ -563,6 +568,41 @@ SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *node
}
/*
* GetConnectionsToTargetNodeSet opens a list of connections with the following properties:
* - connectionFlags - see StartNodeUserDatabaseConnection
* - targetNodeSet - the target node set (e.g. ALL_NODES)
* - nodesLockMode - the lock on pg_dist_node to acquire
* - userName - connect as this user
* - databaseName - connect with this database name
*/
List *
GetConnectionsToTargetNodeSet(uint32 connectionFlags, TargetWorkerSet targetNodeSet,
LOCKMODE nodesLockMode, const char *userName)
{
List *connectionList = NIL;
List *nodeList = TargetWorkerSetNodeList(targetNodeSet, nodesLockMode);
/* use current database */
const char *databaseName = NULL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
node->workerName,
node->workerPort,
userName,
databaseName);
connectionList = lappend(connectionList, connection);
}
FinishConnectionListEstablishment(connectionList);
return connectionList;
}
/*
* ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
* metadata nodes are out of sync. It is safer to avoid metadata changing

View File

@ -301,6 +301,7 @@ extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
const char *userName,
const char *database);
extern void CloseConnectionList(List *connectionList);
extern void CloseConnection(MultiConnection *connection);
extern void ShutdownAllConnections(void);
extern void ShutdownConnection(MultiConnection *connection);

View File

@ -51,6 +51,8 @@ extern void ExecuteRemoteCommandInConnectionList(List *nodeConnectionList,
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
const char *command,
PGresult **result);
extern void SendRemoteCommandToConnectionList(List *connectionList,
const char *command);
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes,

View File

@ -0,0 +1,20 @@
/*-------------------------------------------------------------------------
*
* snapshot.h
*
* Functions for managing distributed snapshots.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef DISTRIBUTED_SNAPSHOT_H
#define DISTRIBUTED_SNAPSHOT_H
extern char * GetSnapshotNameForNode(char *hostname, int port, char *userName,
char *databaseName);
extern void ResetExportedSnapshots(void);
#endif /* DISTRIBUTED_SNAPSHOT_H */

View File

@ -43,7 +43,12 @@ typedef enum TargetWorkerSet
* All the active primary nodes in the metadata which have metadata
* (includes the coodinator if it is added)
*/
METADATA_NODES
METADATA_NODES,
/*
* All other primary nodes in the cluster
*/
OTHER_PRIMARY_NODES
} TargetWorkerSet;
@ -87,7 +92,11 @@ extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
const char *
nodeUser,
List *commandList);
extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort);
extern List * GetConnectionsToTargetNodeSet(uint32 connectionFlags,
TargetWorkerSet targetNodeSet,
LOCKMODE nodesLockMode,
const char *userName);
/* helper functions for worker transactions */
extern bool IsWorkerTransactionActive(void);

View File

@ -1207,6 +1207,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function citus_use_snapshot() void
| function cluster_clock_cmp(cluster_clock,cluster_clock) integer
| function cluster_clock_eq(cluster_clock,cluster_clock) boolean
| function cluster_clock_ge(cluster_clock,cluster_clock) boolean
@ -1230,7 +1231,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq
| type cluster_clock
(29 rows)
(30 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -124,6 +124,7 @@ ORDER BY 1;
function citus_update_node(integer,text,integer,boolean,integer)
function citus_update_shard_statistics(bigint)
function citus_update_table_statistics(regclass)
function citus_use_snapshot()
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
function citus_version()
function cluster_clock_cmp(cluster_clock,cluster_clock)
@ -311,5 +312,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(303 rows)
(304 rows)