citus/src/include/distributed/multi_logical_replication.h

187 lines
6.6 KiB
C

/*-------------------------------------------------------------------------
*
* multi_logical_replication.h
*
* Declarations for public functions and variables used in logical replication
* on the distributed tables while moving shards.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MULTI_LOGICAL_REPLICATION_H_
#define MULTI_LOGICAL_REPLICATION_H_
#include "c.h"
#include "nodes/pg_list.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
/* Config variables managed via guc.c */
extern int LogicalReplicationTimeout;
extern bool PlacementMovedUsingLogicalReplicationInTX;
/*
* NodeAndOwner should be used as a key for structs that should be hashed by a
* combination of node and owner.
*/
typedef struct NodeAndOwner
{
uint32_t nodeId;
Oid tableOwnerId;
} NodeAndOwner;
assert_valid_hash_key2(NodeAndOwner, nodeId, tableOwnerId);
/*
* ReplicationSlotInfo stores the info that defines a replication slot. For
* shard splits this information is built by parsing the result of the
* 'worker_split_shard_replication_setup' UDF.
*/
typedef struct ReplicationSlotInfo
{
uint32 targetNodeId;
Oid tableOwnerId;
char *name;
} ReplicationSlotInfo;
/*
* PublicationInfo stores the information that defines a publication.
*/
typedef struct PublicationInfo
{
NodeAndOwner key;
char *name;
List *shardIntervals;
struct LogicalRepTarget *target;
} PublicationInfo;
/*
* Stores information necesary to create all the th
*/
typedef struct LogicalRepTarget
{
/*
* The Oid of the user that owns the shards in newShards. This Oid is the
* Oid of the user on the coordinator, this Oid is likely different than
* the Oid of the user on the logical replication source or target.
*/
Oid tableOwnerId;
char *subscriptionName;
/*
* The name of the user that's used as the owner of the subscription. This
* is not the same as the name of the user that matches tableOwnerId.
* Instead we create a temporary user with the same permissions as that
* user, with its only purpose being owning the subscription.
*/
char *subscriptionOwnerName;
ReplicationSlotInfo *replicationSlot;
PublicationInfo *publication;
/*
* The shardIntervals that we want to create on this logical replication
* target. This can be different from the shard intervals that are part of
* the publication for two reasons:
* 1. The publication does not contain partitioned tables, only their
* children. The partition parent tables ARE part of newShards.
* 2. For shard splits the publication also contains dummy shards, these
* ARE NOT part of newShards.
*/
List *newShards;
/*
* The superuserConnection is shared between all LogicalRepTargets that have
* the same node. This can be initialized easily by using
* CreateGroupedLogicalRepTargetsConnections.
*/
MultiConnection *superuserConnection;
} LogicalRepTarget;
/*
* GroupedLogicalRepTargets groups LogicalRepTargets by node. This allows to
* create a hashmap where we can filter by search by nodeId. Which is useful
* because these targets can all use the same superuserConection for
* management, which allows us to batch certain operations such as getting
* state of the subscriptions.
*/
typedef struct GroupedLogicalRepTargets
{
uint32 nodeId;
List *logicalRepTargetList;
MultiConnection *superuserConnection;
} GroupedLogicalRepTargets;
/*
* LogicalRepType is used for various functions to do something different for
* shard moves than for shard splits. Such as using a different prefix for a
* subscription name.
*/
typedef enum LogicalRepType
{
SHARD_MOVE,
SHARD_SPLIT,
} LogicalRepType;
extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort);
extern void ConflictOnlyWithIsolationTesting(bool beforeCopy);
extern void CreateReplicaIdentities(List *subscriptionInfoList);
extern void CreateReplicaIdentitiesOnNode(List *shardList,
char *nodeName,
int32 nodePort);
extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection);
extern List * GetQueryResultStringList(MultiConnection *connection, char *query);
extern MultiConnection * GetReplicationConnection(char *nodeName, int nodePort);
extern void CreatePublications(MultiConnection *sourceConnection,
HTAB *publicationInfoHash);
extern void CreateSubscriptions(MultiConnection *sourceConnection,
char *databaseName, List *subscriptionInfoList);
extern char * CreateReplicationSlots(MultiConnection *sourceConnection,
MultiConnection *sourceReplicationConnection,
List *subscriptionInfoList,
char *outputPlugin);
extern void EnableSubscriptions(List *subscriptionInfoList);
extern void DropSubscriptions(List *subscriptionInfoList);
extern void DropReplicationSlots(MultiConnection *sourceConnection,
List *subscriptionInfoList);
extern void DropPublications(MultiConnection *sourceConnection,
HTAB *publicationInfoHash);
extern void DropAllLogicalReplicationLeftovers(LogicalRepType type);
extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId);
extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid
ownerId);
extern char * SubscriptionName(LogicalRepType type, Oid ownerId);
extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId);
extern void WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash);
extern void WaitForAllSubscriptionsToCatchUp(MultiConnection *sourceConnection,
HTAB *groupedLogicalRepTargetsHash);
extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection,
XLogRecPtr sourcePosition,
Bitmapset *tableOwnerIds,
char *operationPrefix);
extern HTAB * CreateGroupedLogicalRepTargetsHash(List *subscriptionInfoList);
extern void CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash);
extern void CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type);
extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList);
extern void CreatePartitioningHierarchy(List *logicalRepTargetList);
#endif /* MULTI_LOGICAL_REPLICATION_H_ */