From 21b0500ad2e8851ff522d06bca6e04425b1fd8a8 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 11 Nov 2022 15:58:10 +0100 Subject: [PATCH] Add a UDF to use a distributed snapshot --- .../connection/connection_management.c | 14 + .../distributed/connection/remote_commands.c | 29 +- .../operations/citus_create_restore_point.c | 2 +- .../distributed/sql/citus--11.1-1--11.2-1.sql | 1 + .../sql/downgrades/citus--11.2-1--11.1-1.sql | 1 + .../sql/udfs/citus_use_snapshot/11.1-1.sql | 9 + .../sql/udfs/citus_use_snapshot/latest.sql | 9 + .../transaction/remote_transaction.c | 19 +- .../distributed/transaction/snapshot.c | 302 ++++++++++++++++++ .../transaction/transaction_management.c | 2 + .../transaction/worker_transaction.c | 62 +++- .../distributed/connection_management.h | 1 + src/include/distributed/remote_commands.h | 2 + .../distributed/transaction/snapshot.h | 20 ++ src/include/distributed/worker_transaction.h | 13 +- src/test/regress/expected/multi_extension.out | 3 +- .../expected/upgrade_list_citus_objects.out | 3 +- 17 files changed, 462 insertions(+), 30 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_use_snapshot/11.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_use_snapshot/latest.sql create mode 100644 src/backend/distributed/transaction/snapshot.c create mode 100644 src/include/distributed/transaction/snapshot.h diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index c5b300fd4..9b08c42aa 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -678,6 +678,20 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort) } +/* + * CloseConnectionList closes all connections in the given list. + */ +void +CloseConnectionList(List *connectionList) +{ + MultiConnection *connection = NULL; + foreach_ptr(connection, connectionList) + { + CloseConnection(connection); + } +} + + /* * Close a previously established connection. */ diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 906d78e42..12744ff31 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -424,6 +424,25 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command) } +/* + * SendCommandToConnectionList sends a given command over all connections + * in the list in parallel. + */ +void +SendRemoteCommandToConnectionList(List *connectionList, const char *command) +{ + MultiConnection *connection = NULL; + foreach_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, command); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } +} + + /* * ExecuteRemoteCommandInConnectionList executes a remote command, on all connections * given in the list, that is critical to the transaction. If the command fails then @@ -434,15 +453,7 @@ ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *comma { MultiConnection *connection = NULL; - foreach_ptr(connection, nodeConnectionList) - { - int querySent = SendRemoteCommand(connection, command); - - if (querySent == 0) - { - ReportConnectionError(connection, ERROR); - } - } + SendRemoteCommandToConnectionList(nodeConnectionList, command); /* Process the result */ foreach_ptr(connection, nodeConnectionList) diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index 42fc5311f..9ed2f4c02 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -111,7 +111,7 @@ citus_create_restore_point(PG_FUNCTION_ARGS) * OpenConnectionsToAllNodes opens connections to all nodes and returns the list * of connections. */ -static List * +List * OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode) { List *connectionList = NIL; diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index d40336103..1c072eeab 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) #include "udfs/citus_get_transaction_clock/11.2-1.sql" #include "udfs/citus_is_clock_after/11.2-1.sql" #include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql" +#include "udfs/citus_use_snapshot/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql index 7cdd51140..f8aa55517 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql @@ -1,6 +1,7 @@ -- citus--11.2-1--11.1-1 #include "../udfs/get_rebalance_progress/11.1-1.sql" #include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" +DROP FUNCTION pg_catalog.citus_use_snapshot(); DROP FUNCTION pg_catalog.citus_get_node_clock(); DROP FUNCTION pg_catalog.citus_get_transaction_clock(); DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock); diff --git a/src/backend/distributed/sql/udfs/citus_use_snapshot/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_use_snapshot/11.1-1.sql new file mode 100644 index 000000000..74a96b1f5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_use_snapshot/11.1-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_use_snapshot() + RETURNS void + LANGUAGE c + STRICT +AS '$libdir/citus', $function$citus_use_snapshot$function$; +COMMENT ON FUNCTION pg_catalog.citus_use_snapshot() + IS 'use a consistent a consistent distributed snapshot for the remainder of the transaction'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_use_snapshot() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_use_snapshot/latest.sql b/src/backend/distributed/sql/udfs/citus_use_snapshot/latest.sql new file mode 100644 index 000000000..74a96b1f5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_use_snapshot/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_use_snapshot() + RETURNS void + LANGUAGE c + STRICT +AS '$libdir/citus', $function$citus_use_snapshot$function$; +COMMENT ON FUNCTION pg_catalog.citus_use_snapshot() + IS 'use a consistent a consistent distributed snapshot for the remainder of the transaction'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_use_snapshot() TO PUBLIC; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index aff357fb3..5521375e1 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -28,6 +28,7 @@ #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" +#include "distributed/transaction/snapshot.h" #include "distributed/transaction_identifier.h" #include "distributed/transaction_management.h" #include "distributed/transaction_recovery.h" @@ -85,14 +86,22 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) StringInfo beginAndSetDistributedTransactionId = makeStringInfo(); - /* - * Explicitly specify READ COMMITTED, the default on the remote - * side might have been changed, and that would cause problematic - * behaviour. - */ + /* append the BEGIN command */ appendStringInfoString(beginAndSetDistributedTransactionId, BeginTransactionCommand()); + /* when using a distributed snapshot, append SET TRANSACTION SNAPSHOT .. */ + char *exportedSnapshotName = GetSnapshotNameForNode(connection->hostname, + connection->port, + connection->user, + connection->database); + if (exportedSnapshotName != NULL) + { + appendStringInfo(beginAndSetDistributedTransactionId, + "SET TRANSACTION SNAPSHOT %s;", + quote_literal_cstr(exportedSnapshotName)); + } + /* append context for in-progress SAVEPOINTs for this transaction */ List *activeSubXacts = ActiveSubXactContexts(); transaction->lastSuccessfulSubXact = TopSubTransactionId; diff --git a/src/backend/distributed/transaction/snapshot.c b/src/backend/distributed/transaction/snapshot.c new file mode 100644 index 000000000..38baa2c60 --- /dev/null +++ b/src/backend/distributed/transaction/snapshot.c @@ -0,0 +1,302 @@ +/*------------------------------------------------------------------------- + * + * snapshot.c + * Basic distributed snapshot isolation implementation + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "distributed/citus_safe_lib.h" +#include "distributed/connection_management.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/transaction/snapshot.h" +#include "distributed/transaction_management.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" +#include "storage/lockdefs.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" + + +/* locking in share mode conflicts with 2PC commit, but not with other snapshots */ +#define BLOCK_2PC_COMMAND "LOCK pg_catalog.pg_dist_transaction IN SHARE MODE" +#define EXPORT_SNAPSHOT_COMMAND "SELECT pg_catalog.pg_export_snapshot()" + +/* + * ExportedSnapshotEntry represents an entry in the ExportSnapshotsHash hash. + */ +typedef struct ExportedSnapshotEntry +{ + /* + * We reuse the full connection key instead of node ID primarily for + * ease-of-integration with connection and transaction management APIs. + */ + ConnectionHashKey key; + + /* name of the exported snapshot (file) */ + char snapshotName[MAXPGPATH]; +} ExportedSnapshotEntry; + + +PG_FUNCTION_INFO_V1(citus_use_snapshot); + +static void UseDistributedSnapshot(void); +static HTAB * CreateExportSnapshotsHash(void); + + +/* + * ExportSnapshotsHash contains the node ID to snapshot name mapping + * for the current transaction. + */ +static HTAB *ExportSnapshotsHash = NULL; + +/* + * citus_use_snapshot creates a distributed snapshot and uses it for the + * remainder of the transaction. + */ +Datum +citus_use_snapshot(PG_FUNCTION_ARGS) +{ + UseDistributedSnapshot(); + PG_RETURN_VOID(); +} + + +/* + * UseDistributedSnapshot briefly locks 2PCs across the cluster, thenn exports + * snapshots on all nodes, and releases the locks. The exports snapshots are + * subsequently used in all worker node transactions for the remainder of the + * transaction. + * + * The net effect is that ongoing 2PCs are either visible in all snapshots + * or not yet visible. + */ +static void +UseDistributedSnapshot(void) +{ + if (ExportSnapshotsHash != NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("citus_use_snapshot can only be called once per " + "transaction"))); + } + + if (!IsolationUsesXactSnapshot()) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("citus_use_snapshot can only be used in a transaction " + "with isolation level SERIALIZABLE or REPEATABLE READ"))); + } + + if (GetTopTransactionIdIfAny() != InvalidTransactionId || IsSubTransaction() || + !dlist_is_empty(&InProgressTransactions)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("citus_use_snapshot must be called before any query"))); + } + + HTAB *snapshotsHash = CreateExportSnapshotsHash(); + + /* we definitely want a distributed transaction when asking for a snapshot */ + UseCoordinatedTransaction(); + + /* + * Prepare a set of regular connections for exporting the snapshot. These can + * be reused for other purposes by the current user. + */ + int connectionFlags = 0; + List *snapshotConnections = GetConnectionsToTargetNodeSet(connectionFlags, + OTHER_PRIMARY_NODES, + NoLock, + CurrentUserName()); + + /* + * Prepare a set of new connections to lock pg_dist_transaction as superuser. + * + * We open connections to other nodes using superuser to have lock privileges + * on pg_dist_transaction. We immediately close these connections to release + * the lock and free up the resources. + * + * We do not take specific locks on pg_dist_node to avoid deadlock opportunities. + * If a new node gets added concurrently, it seems unlikely that shards will arrive + * there before we get our snapshot. + */ + List *blockingConnections = GetConnectionsToTargetNodeSet(FORCE_NEW_CONNECTION, + OTHER_PRIMARY_NODES, + NoLock, + CitusExtensionOwnerName()); + + + /* lock pg_dist_transaction locally to prevent concurrent 2PC commits */ + LockRelationOid(DistTransactionRelationId(), ShareLock); + + /* lock pg_dist_transaction on all other nodes to prevent concurrent 2PC commits */ + RemoteTransactionListBegin(blockingConnections); + ExecuteRemoteCommandInConnectionList(blockingConnections, BLOCK_2PC_COMMAND); + + /* export a snapshot on the current node */ + char *localSnapshotName = ExportSnapshot(GetActiveSnapshot()); + + /* create the hash key for the local node */ + WorkerNode *localNode = LookupNodeByNodeIdOrError(GetLocalNodeId()); + + ConnectionHashKey localNodeKey; + memset_struct_0(localNodeKey); + + strlcpy(localNodeKey.hostname, localNode->workerName, MAX_NODE_LENGTH); + localNodeKey.port = localNode->workerPort; + strlcpy(localNodeKey.user, CurrentUserName(), NAMEDATALEN); + strlcpy(localNodeKey.database, CurrentDatabaseName(), NAMEDATALEN); + localNodeKey.replicationConnParam = false; + + /* add the snapshot name of the current node to the hash */ + bool isFound = false; + ExportedSnapshotEntry *localSnapshotEntry = + hash_search(snapshotsHash, &localNodeKey, HASH_ENTER, &isFound); + strlcpy(localSnapshotEntry->snapshotName, localSnapshotName, MAXPGPATH); + + /* + * Now export a snapshot on other nodes. The current isolation level + * is at least REPATABLE READ (enforced above), which will be propagated + * to other nodes as part of the BEGIN. + */ + RemoteTransactionListBegin(snapshotConnections); + SendRemoteCommandToConnectionList(snapshotConnections, EXPORT_SNAPSHOT_COMMAND); + + MultiConnection *connection = NULL; + foreach_ptr(connection, snapshotConnections) + { + /* create the hash key for the remote node */ + ConnectionHashKey remoteNodeKey; + memset_struct_0(remoteNodeKey); + + strlcpy(remoteNodeKey.hostname, connection->hostname, MAX_NODE_LENGTH); + remoteNodeKey.port = connection->port; + strlcpy(remoteNodeKey.user, connection->user, NAMEDATALEN); + strlcpy(remoteNodeKey.database, connection->database, NAMEDATALEN); + remoteNodeKey.replicationConnParam = false; + + /* + * Create the hash entry before GetRemoteCommandResult to avoid allocations + * (OOM possibilities) before PQclear. + */ + ExportedSnapshotEntry *remoteSnapshotEntry = + hash_search(snapshotsHash, &remoteNodeKey, HASH_ENTER, &isFound); + Assert(!isFound); + + bool raiseErrors = true; + PGresult *queryResult = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(queryResult)) + { + ReportResultError(connection, queryResult, ERROR); + } + + if (PQntuples(queryResult) != 1 || PQnfields(queryResult) != 1) + { + PQclear(queryResult); + ereport(ERROR, (errmsg("unexpected result for: %s", + EXPORT_SNAPSHOT_COMMAND))); + } + + char *exportedSnapshotName = PQgetvalue(queryResult, 0, 0); + + /* first copy into stack (avoid allocations / OOM risk) */ + strlcpy(remoteSnapshotEntry->snapshotName, exportedSnapshotName, MAXPGPATH); + + PQclear(queryResult); + ClearResults(connection, raiseErrors); + } + + /* unblock 2PCs by closing the connections holding locks */ + CloseConnectionList(blockingConnections); + + /* start using the snapshot */ + ExportSnapshotsHash = snapshotsHash; +} + + +/* + * CreateExportSnapshotsHash creates the node ID to exported snapshot name + * mapping. + */ +static HTAB * +CreateExportSnapshotsHash(void) +{ + HASHCTL info; + memset_struct_0(info); + + info.keysize = sizeof(ConnectionHashKey); + info.entrysize = sizeof(ExportedSnapshotEntry); + + /* + * We may use the snapshot throughout the lifetime of the transaction, + * hence we use TopTransactionContext. + * + * (CreateSimpleHash et al. use CurrentMemoryContext) + */ + info.hcxt = TopTransactionContext; + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + HTAB *exportedSnapshotsHash = hash_create("exported snapshot names", 32, &info, + hashFlags); + + return exportedSnapshotsHash; +} + + +/* + * GetSnapshotNameForNode returns the snapshot name for the given node or NULL + * if no snapshot was exported. + */ +char * +GetSnapshotNameForNode(char *hostname, int port, char *userName, char *databaseName) +{ + if (ExportSnapshotsHash == NULL) + { + return NULL; + } + + ConnectionHashKey nodeKey; + memset_struct_0(nodeKey); + strlcpy(nodeKey.hostname, hostname, MAX_NODE_LENGTH); + nodeKey.port = port; + strlcpy(nodeKey.user, userName, NAMEDATALEN); + strlcpy(nodeKey.database, databaseName, NAMEDATALEN); + nodeKey.replicationConnParam = false; + + bool isFound = false; + ExportedSnapshotEntry *remoteSnapshotEntry = + hash_search(ExportSnapshotsHash, &nodeKey, HASH_FIND, &isFound); + if (!isFound) + { + return NULL; + } + + /* safe to return since it is allocated until the end of the transaction */ + return remoteSnapshotEntry->snapshotName; +} + + +/* + * ResetExportedSnapshots resets the hash of exported snapshots. + * + * We do not bother freeing memory, since it is allocated in + * TopTransactionContext and will get reset at the end of the + * transaction. + */ +void +ResetExportedSnapshots(void) +{ + ExportSnapshotsHash = NULL; +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0f4c3c80a..eba299515 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,6 +34,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" +#include "distributed/transaction/snapshot.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -618,6 +619,7 @@ ResetGlobalVariables() BeginXactReadOnly = BeginXactReadOnly_NotSet; BeginXactDeferrable = BeginXactDeferrable_NotSet; ResetWorkerErrorIndication(); + ResetExportedSnapshots(); memset(&AllowedDistributionColumnValue, 0, sizeof(AllowedDistributionColumn)); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 486dd7280..496b9246f 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -142,27 +142,32 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) - { - workerNodeList = ActivePrimaryNodeList(lockMode); - } - else - { - workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); - } + List *workerNodeList = ActivePrimaryNodeList(lockMode); List *result = NIL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet == - METADATA_NODES) && + if ((targetWorkerSet == NON_COORDINATOR_NODES || + targetWorkerSet == NON_COORDINATOR_METADATA_NODES) && + workerNode->groupId == COORDINATOR_GROUP_ID) + { + continue; + } + + if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || + targetWorkerSet == METADATA_NODES) && !workerNode->hasMetadata) { continue; } + if (targetWorkerSet == OTHER_PRIMARY_NODES && + workerNode->groupId == GetLocalGroupId()) + { + continue; + } + result = lappend(result, workerNode); } @@ -563,6 +568,41 @@ SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *node } +/* + * GetConnectionsToTargetNodeSet opens a list of connections with the following properties: + * - connectionFlags - see StartNodeUserDatabaseConnection + * - targetNodeSet - the target node set (e.g. ALL_NODES) + * - nodesLockMode - the lock on pg_dist_node to acquire + * - userName - connect as this user + * - databaseName - connect with this database name + */ +List * +GetConnectionsToTargetNodeSet(uint32 connectionFlags, TargetWorkerSet targetNodeSet, + LOCKMODE nodesLockMode, const char *userName) +{ + List *connectionList = NIL; + List *nodeList = TargetWorkerSetNodeList(targetNodeSet, nodesLockMode); + + /* use current database */ + const char *databaseName = NULL; + + WorkerNode *node = NULL; + foreach_ptr(node, nodeList) + { + MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, + node->workerName, + node->workerPort, + userName, + databaseName); + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + return connectionList; +} + + /* * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given * metadata nodes are out of sync. It is safer to avoid metadata changing diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 9a4e8a134..22e712549 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -301,6 +301,7 @@ extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, const char *database); +extern void CloseConnectionList(List *connectionList); extern void CloseConnection(MultiConnection *connection); extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index f903ebe66..acfe929c1 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -51,6 +51,8 @@ extern void ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command, PGresult **result); +extern void SendRemoteCommandToConnectionList(List *connectionList, + const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, int parameterCount, const Oid *parameterTypes, diff --git a/src/include/distributed/transaction/snapshot.h b/src/include/distributed/transaction/snapshot.h new file mode 100644 index 000000000..2912ce87d --- /dev/null +++ b/src/include/distributed/transaction/snapshot.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * snapshot.h + * + * Functions for managing distributed snapshots. + * + * Copyright (c) Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef DISTRIBUTED_SNAPSHOT_H +#define DISTRIBUTED_SNAPSHOT_H + + +extern char * GetSnapshotNameForNode(char *hostname, int port, char *userName, + char *databaseName); +extern void ResetExportedSnapshots(void); + + +#endif /* DISTRIBUTED_SNAPSHOT_H */ diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index aa137b76b..d29d57f30 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -43,7 +43,12 @@ typedef enum TargetWorkerSet * All the active primary nodes in the metadata which have metadata * (includes the coodinator if it is added) */ - METADATA_NODES + METADATA_NODES, + + /* + * All other primary nodes in the cluster + */ + OTHER_PRIMARY_NODES } TargetWorkerSet; @@ -87,7 +92,11 @@ extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( const char * nodeUser, List *commandList); -extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort); + +extern List * GetConnectionsToTargetNodeSet(uint32 connectionFlags, + TargetWorkerSet targetNodeSet, + LOCKMODE nodesLockMode, + const char *userName); /* helper functions for worker transactions */ extern bool IsWorkerTransactionActive(void); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5d9b3ea5f..fc1d2c26e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1207,6 +1207,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_get_transaction_clock() cluster_clock | function citus_internal_adjust_local_clock_to_remote(cluster_clock) void | function citus_is_clock_after(cluster_clock,cluster_clock) boolean + | function citus_use_snapshot() void | function cluster_clock_cmp(cluster_clock,cluster_clock) integer | function cluster_clock_eq(cluster_clock,cluster_clock) boolean | function cluster_clock_ge(cluster_clock,cluster_clock) boolean @@ -1230,7 +1231,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | operator family cluster_clock_ops for access method btree | sequence pg_dist_clock_logical_seq | type cluster_clock -(29 rows) +(30 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 14578a976..91ef94bf7 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -124,6 +124,7 @@ ORDER BY 1; function citus_update_node(integer,text,integer,boolean,integer) function citus_update_shard_statistics(bigint) function citus_update_table_statistics(regclass) + function citus_use_snapshot() function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc) function citus_version() function cluster_clock_cmp(cluster_clock,cluster_clock) @@ -311,5 +312,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(303 rows) +(304 rows)