mirror of https://github.com/citusdata/citus.git
Share more code between splits and moves (#6152)
When introducing non-blocking shard split functionality it was based heavily on the non-blocking shard moves. However, differences between usage was slightly to big to be able to reuse the existing functions easily. So, most logical replication code was simply copied to dedicated shard split functions and modified for that purpose. This PR tries to create a more generic logical replication infrastructure that can be used by both shard splits and shard moves. There's probably more code sharing possible in the future, but I believe this is at least a good start and addresses the lowest hanging fruit. This also adds a CreateSimpleHash function that makes creating the most common type of hashmap common.pull/5860/head
parent
b491d87931
commit
43c2a1e88b
|
@ -427,16 +427,24 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
|
||||
}
|
||||
|
||||
/*
|
||||
* For security and reliability reasons we disallow altering and dropping
|
||||
* subscriptions created by citus by non superusers. We could probably
|
||||
* disallow this for all subscriptions without issues. But out of an
|
||||
* abundance of caution for breaking subscription logic created by users
|
||||
* for other purposes, we only disallow it for the subscriptions that we
|
||||
* create i.e. ones that start with "citus_".
|
||||
*/
|
||||
if (IsA(parsetree, AlterSubscriptionStmt))
|
||||
{
|
||||
AlterSubscriptionStmt *alterSubStmt = (AlterSubscriptionStmt *) parsetree;
|
||||
if (!superuser() &&
|
||||
StringStartsWith(alterSubStmt->subname,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX))
|
||||
StringStartsWith(alterSubStmt->subname, "citus_"))
|
||||
{
|
||||
ereport(ERROR, (
|
||||
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("Only superusers can alter shard move subscriptions")));
|
||||
errmsg(
|
||||
"Only superusers can alter subscriptions that are created by citus")));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -444,11 +452,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
{
|
||||
DropSubscriptionStmt *dropSubStmt = (DropSubscriptionStmt *) parsetree;
|
||||
if (!superuser() &&
|
||||
StringStartsWith(dropSubStmt->subname, SHARD_MOVE_SUBSCRIPTION_PREFIX))
|
||||
StringStartsWith(dropSubStmt->subname, "citus_"))
|
||||
{
|
||||
ereport(ERROR, (
|
||||
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("Only superusers can drop shard move subscriptions")));
|
||||
errmsg(
|
||||
"Only superusers can drop subscriptions that are created by citus")));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
|
@ -51,6 +52,17 @@ typedef struct ShardCreatedByWorkflowEntry
|
|||
WorkerNode *workerNodeValue;
|
||||
} ShardCreatedByWorkflowEntry;
|
||||
|
||||
/*
|
||||
* Entry for map that trackes dummy shards.
|
||||
* Key: node + owner
|
||||
* Value: List of dummy shards for that node + owner
|
||||
*/
|
||||
typedef struct GroupedDummyShards
|
||||
{
|
||||
NodeAndOwner key;
|
||||
List *shardIntervals;
|
||||
} GroupedDummyShards;
|
||||
|
||||
/* Function declarations */
|
||||
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
||||
ShardInterval *shardIntervalToSplit,
|
||||
|
@ -74,9 +86,7 @@ static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList);
|
|||
static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList,
|
||||
bool includeReplicaIdentity);
|
||||
static void CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement,
|
||||
List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList);
|
||||
static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement);
|
||||
static void CreateObjectOnPlacement(List *objectCreationCommandList,
|
||||
WorkerNode *workerNode);
|
||||
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
|
||||
|
@ -114,10 +124,6 @@ static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *work
|
|||
static StringInfo CreateSplitShardReplicationSetupUDF(
|
||||
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList,
|
||||
List *destinationWorkerNodesList);
|
||||
static char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
|
||||
WorkerNode *sourceWorkerNode,
|
||||
MultiConnection **
|
||||
templateSlotConnection);
|
||||
static List * ParseReplicationSlotInfoFromResult(PGresult *result);
|
||||
|
||||
static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
|
||||
|
@ -1101,17 +1107,16 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
|
|||
/*
|
||||
* Iterate on split shards list for a given shard and create constraints.
|
||||
*/
|
||||
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
||||
workersForPlacementList)
|
||||
forboth_ptr(shardInterval, shardIntervalList,
|
||||
workerPlacementNode, workersForPlacementList)
|
||||
{
|
||||
List *shardForeignConstraintCommandList = NIL;
|
||||
List *referenceTableForeignConstraintList = NIL;
|
||||
|
||||
CopyShardForeignConstraintCommandListGrouped(shardInterval,
|
||||
&
|
||||
shardForeignConstraintCommandList,
|
||||
&
|
||||
referenceTableForeignConstraintList);
|
||||
CopyShardForeignConstraintCommandListGrouped(
|
||||
shardInterval,
|
||||
&shardForeignConstraintCommandList,
|
||||
&referenceTableForeignConstraintList);
|
||||
|
||||
List *constraintCommandList = NIL;
|
||||
constraintCommandList = list_concat(constraintCommandList,
|
||||
|
@ -1271,29 +1276,30 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
shardIntervalToSplit->shardId);
|
||||
|
||||
/* Create hashmap to group shards for publication-subscription management */
|
||||
HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
|
||||
HTAB *publicationInfoHash = CreateShardSplitInfoMapForPublication(
|
||||
sourceColocatedShardIntervalList,
|
||||
shardGroupSplitIntervalListList,
|
||||
workersForPlacementList);
|
||||
|
||||
DropAllShardSplitLeftOvers(sourceShardToCopyNode, shardSplitHashMapForPublication);
|
||||
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
|
||||
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||
sourceShardToCopyNode
|
||||
->
|
||||
workerName,
|
||||
sourceShardToCopyNode
|
||||
->
|
||||
workerPort,
|
||||
superUser,
|
||||
databaseName);
|
||||
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(
|
||||
connectionFlags,
|
||||
sourceShardToCopyNode->workerName,
|
||||
sourceShardToCopyNode->workerPort,
|
||||
superUser,
|
||||
databaseName);
|
||||
ClaimConnectionExclusively(sourceConnection);
|
||||
|
||||
HTAB *mapOfShardToPlacementCreatedByWorkflow =
|
||||
CreateEmptyMapForShardsCreatedByWorkflow();
|
||||
|
||||
HTAB *mapOfDummyShardToPlacement = SetupHashMapForShardInfo();
|
||||
HTAB *mapOfDummyShardToPlacement = CreateSimpleHash(NodeAndOwner,
|
||||
GroupedShardSplitInfos);
|
||||
MultiConnection *sourceReplicationConnection =
|
||||
GetReplicationConnection(sourceShardToCopyNode->workerName,
|
||||
sourceShardToCopyNode->workerPort);
|
||||
|
||||
/* Non-Blocking shard split workflow starts here */
|
||||
PG_TRY();
|
||||
|
@ -1315,27 +1321,22 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
sourceShardToCopyNode,
|
||||
workersForPlacementList);
|
||||
|
||||
CreateReplicaIdentities(mapOfDummyShardToPlacement,
|
||||
shardGroupSplitIntervalListList, workersForPlacementList);
|
||||
|
||||
/* 3) Create Publications. */
|
||||
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
|
||||
|
||||
/*
|
||||
* 4) Create template replication Slot. It returns a snapshot. The snapshot remains
|
||||
* valid till the lifetime of the session that creates it. The connection is closed
|
||||
* at the end of the workflow.
|
||||
* 3) Create replica identities on dummy shards. This needs to be done
|
||||
* before the subscriptions are created. Otherwise the subscription
|
||||
* creation will get stuck waiting for the publication to send a
|
||||
* replica identity. Since we never actually write data into these
|
||||
* dummy shards there's no point in creating these indexes after the
|
||||
* initial COPY phase, like we do for the replica identities on the
|
||||
* target shards.
|
||||
*/
|
||||
MultiConnection *templateSlotConnection = NULL;
|
||||
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
|
||||
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
|
||||
CreateReplicaIdentitiesForDummyShards(mapOfDummyShardToPlacement);
|
||||
|
||||
/* 5) Do snapshotted Copy */
|
||||
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
|
||||
shardGroupSplitIntervalListList, workersForPlacementList,
|
||||
snapShotName);
|
||||
/* 4) Create Publications. */
|
||||
CreatePublications(sourceConnection, publicationInfoHash);
|
||||
|
||||
/* 6) Execute 'worker_split_shard_replication_setup UDF */
|
||||
|
||||
/* 5) Execute 'worker_split_shard_replication_setup UDF */
|
||||
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
|
||||
sourceShardToCopyNode,
|
||||
sourceColocatedShardIntervalList,
|
||||
|
@ -1346,80 +1347,98 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
* Subscriber flow starts from here.
|
||||
* Populate 'ShardSplitSubscriberMetadata' for subscription management.
|
||||
*/
|
||||
List *shardSplitSubscribersMetadataList =
|
||||
List *logicalRepTargetList =
|
||||
PopulateShardSplitSubscriptionsMetadataList(
|
||||
shardSplitHashMapForPublication, replicationSlotInfoList);
|
||||
publicationInfoHash, replicationSlotInfoList,
|
||||
shardGroupSplitIntervalListList, workersForPlacementList);
|
||||
|
||||
HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash(
|
||||
logicalRepTargetList);
|
||||
|
||||
/* Create connections to the target nodes */
|
||||
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
|
||||
shardSplitSubscribersMetadataList,
|
||||
connectionFlags,
|
||||
CreateGroupedLogicalRepTargetsConnections(
|
||||
groupedLogicalRepTargetsHash,
|
||||
superUser, databaseName);
|
||||
|
||||
/* 7) Create copies of template replication slot */
|
||||
char *templateSlotName = ShardSplitTemplateReplicationSlotName(
|
||||
shardIntervalToSplit->shardId);
|
||||
CreateReplicationSlots(sourceConnection, templateSlotName,
|
||||
shardSplitSubscribersMetadataList);
|
||||
char *logicalRepDecoderPlugin = "citus";
|
||||
|
||||
/*
|
||||
* 6) Create replication slots and keep track of their snapshot.
|
||||
*/
|
||||
char *snapshot = CreateReplicationSlots(
|
||||
sourceConnection,
|
||||
sourceReplicationConnection,
|
||||
logicalRepTargetList,
|
||||
logicalRepDecoderPlugin);
|
||||
|
||||
/*
|
||||
* 7) Create subscriptions. This isn't strictly needed yet at this
|
||||
* stage, but this way we error out quickly if it fails.
|
||||
*/
|
||||
CreateSubscriptions(
|
||||
sourceConnection,
|
||||
databaseName,
|
||||
logicalRepTargetList);
|
||||
|
||||
/* 8) Do snapshotted Copy */
|
||||
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
|
||||
shardGroupSplitIntervalListList, workersForPlacementList,
|
||||
snapshot);
|
||||
|
||||
/* 8) Create subscriptions on target nodes */
|
||||
CreateShardSplitSubscriptions(targetNodeConnectionList,
|
||||
shardSplitSubscribersMetadataList,
|
||||
sourceShardToCopyNode,
|
||||
superUser,
|
||||
databaseName);
|
||||
|
||||
/* Used for testing */
|
||||
ConflictOnlyWithIsolationTesting();
|
||||
|
||||
/* 9) Wait for subscriptions to be ready */
|
||||
WaitForShardSplitRelationSubscriptionsBecomeReady(
|
||||
shardSplitSubscribersMetadataList);
|
||||
/*
|
||||
* 9) Create replica identities, this needs to be done before enabling
|
||||
* the subscriptions.
|
||||
*/
|
||||
CreateReplicaIdentities(logicalRepTargetList);
|
||||
|
||||
/* 10) Wait for subscribers to catchup till source LSN */
|
||||
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
|
||||
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
|
||||
shardSplitSubscribersMetadataList);
|
||||
/*
|
||||
* 10) Enable the subscriptions: Start the catchup phase
|
||||
*/
|
||||
EnableSubscriptions(logicalRepTargetList);
|
||||
|
||||
/* 11) Create Auxilary structures */
|
||||
/* 11) Wait for subscriptions to be ready */
|
||||
WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash);
|
||||
|
||||
/* 12) Wait for subscribers to catchup till source LSN */
|
||||
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
|
||||
|
||||
/* 13) Create Auxilary structures */
|
||||
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
|
||||
workersForPlacementList,
|
||||
false /* includeReplicaIdentity*/);
|
||||
|
||||
/* 12) Wait for subscribers to catchup till source LSN */
|
||||
sourcePosition = GetRemoteLogPosition(sourceConnection);
|
||||
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
|
||||
shardSplitSubscribersMetadataList);
|
||||
/* 14) Wait for subscribers to catchup till source LSN */
|
||||
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
|
||||
|
||||
/* 13) Block writes on source shards */
|
||||
/* 15) Block writes on source shards */
|
||||
BlockWritesToShardList(sourceColocatedShardIntervalList);
|
||||
|
||||
/* 14) Wait for subscribers to catchup till source LSN */
|
||||
sourcePosition = GetRemoteLogPosition(sourceConnection);
|
||||
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
|
||||
shardSplitSubscribersMetadataList);
|
||||
/* 16) Wait for subscribers to catchup till source LSN */
|
||||
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
|
||||
|
||||
/* 15) Drop Subscribers */
|
||||
DropShardSplitSubsriptions(shardSplitSubscribersMetadataList);
|
||||
/* 17) Drop Subscribers */
|
||||
DropSubscriptions(logicalRepTargetList);
|
||||
|
||||
/* 16) Drop Publications */
|
||||
DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
|
||||
|
||||
/* 17) Drop replication slots
|
||||
* Drop template and subscriber replication slots
|
||||
/* 18) Drop replication slots
|
||||
*/
|
||||
DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName(
|
||||
shardIntervalToSplit->shardId));
|
||||
DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList);
|
||||
DropReplicationSlots(sourceConnection, logicalRepTargetList);
|
||||
|
||||
/* 19) Drop Publications */
|
||||
DropPublications(sourceConnection, publicationInfoHash);
|
||||
|
||||
|
||||
/*
|
||||
* 18) Drop old shards and delete related metadata. Have to do that before
|
||||
* 20) Drop old shards and delete related metadata. Have to do that before
|
||||
* creating the new shard metadata, because there's cross-checks
|
||||
* preventing inconsistent metadata (like overlapping shards).
|
||||
*/
|
||||
DropShardList(sourceColocatedShardIntervalList);
|
||||
|
||||
/* 19) Insert new shard and placement metdata */
|
||||
/* 21) Insert new shard and placement metdata */
|
||||
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
||||
workersForPlacementList);
|
||||
|
||||
|
@ -1427,7 +1446,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
workersForPlacementList);
|
||||
|
||||
/*
|
||||
* 20) Create foreign keys if exists after the metadata changes happening in
|
||||
* 22) Create foreign keys if exists after the metadata changes happening in
|
||||
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
|
||||
* key creation depends on the new metadata.
|
||||
*/
|
||||
|
@ -1435,18 +1454,18 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
workersForPlacementList);
|
||||
|
||||
/*
|
||||
* 21) Drop dummy shards.
|
||||
* 23) Drop dummy shards.
|
||||
*/
|
||||
DropDummyShards(mapOfDummyShardToPlacement);
|
||||
|
||||
/* 22) Close source connection */
|
||||
/* 24) Close source connection */
|
||||
CloseConnection(sourceConnection);
|
||||
|
||||
/* 23) Close all subscriber connections */
|
||||
CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList);
|
||||
/* 25) Close all subscriber connections */
|
||||
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
|
||||
|
||||
/* 24) Close connection of template replication slot */
|
||||
CloseConnection(templateSlotConnection);
|
||||
/* 26) Close connection of template replication slot */
|
||||
CloseConnection(sourceReplicationConnection);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
|
@ -1456,8 +1475,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
/* Do a best effort cleanup of shards created on workers in the above block */
|
||||
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
|
||||
|
||||
DropAllShardSplitLeftOvers(sourceShardToCopyNode,
|
||||
shardSplitHashMapForPublication);
|
||||
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
|
||||
|
||||
DropDummyShards(mapOfDummyShardToPlacement);
|
||||
|
||||
|
@ -1603,44 +1621,6 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTemplateReplicationSlotAndReturnSnapshot creates a replication slot
|
||||
* and returns its snapshot. This slot acts as a 'Template' for creating
|
||||
* replication slot copies used for logical replication.
|
||||
*
|
||||
* The snapshot remains valid till the lifetime of the session that creates it.
|
||||
*/
|
||||
char *
|
||||
CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
|
||||
WorkerNode *sourceWorkerNode,
|
||||
MultiConnection **templateSlotConnection)
|
||||
{
|
||||
/*Create Template replication slot */
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
connectionFlags |= REQUIRE_REPLICATION_CONNECTION_PARAM;
|
||||
|
||||
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||
sourceWorkerNode->
|
||||
workerName,
|
||||
sourceWorkerNode->
|
||||
workerPort,
|
||||
CitusExtensionOwnerName(),
|
||||
get_database_name(
|
||||
MyDatabaseId));
|
||||
ClaimConnectionExclusively(sourceConnection);
|
||||
|
||||
/*
|
||||
* Try to drop leftover template replication slot if any from previous operation
|
||||
* and create new one.
|
||||
*/
|
||||
char *snapShotName = CreateTemplateReplicationSlot(shardInterval,
|
||||
sourceConnection);
|
||||
*templateSlotConnection = sourceConnection;
|
||||
|
||||
return snapShotName;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteSplitShardReplicationSetupUDF executes
|
||||
* 'worker_split_shard_replication_setup' UDF on source shard node
|
||||
|
@ -1796,22 +1776,23 @@ ParseReplicationSlotInfoFromResult(PGresult *result)
|
|||
List *replicationSlotInfoList = NIL;
|
||||
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||
{
|
||||
ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0(
|
||||
ReplicationSlotInfo *replicationSlot = (ReplicationSlotInfo *) palloc0(
|
||||
sizeof(ReplicationSlotInfo));
|
||||
|
||||
char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/);
|
||||
|
||||
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
|
||||
replicationSlot->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
|
||||
|
||||
/* We're using the pstrdup to copy the data into the current memory context */
|
||||
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex,
|
||||
1 /* table owner name column */));
|
||||
bool missingOk = false;
|
||||
replicationSlot->tableOwnerId = get_role_oid(
|
||||
PQgetvalue(result, rowIndex, 1 /* table owner name column */),
|
||||
missingOk);
|
||||
|
||||
/* Replication slot name */
|
||||
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex,
|
||||
2 /* slot name column */));
|
||||
replicationSlot->name = pstrdup(PQgetvalue(result, rowIndex,
|
||||
2 /* slot name column */));
|
||||
|
||||
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
|
||||
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlot);
|
||||
}
|
||||
|
||||
return replicationSlotInfoList;
|
||||
|
@ -1829,22 +1810,22 @@ static void
|
|||
AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId,
|
||||
ShardInterval *shardInterval)
|
||||
{
|
||||
NodeShardMappingKey key;
|
||||
NodeAndOwner key;
|
||||
key.nodeId = targetNodeId;
|
||||
key.tableOwnerId = TableOwnerOid(shardInterval->relationId);
|
||||
|
||||
bool found = false;
|
||||
NodeShardMappingEntry *nodeMappingEntry =
|
||||
(NodeShardMappingEntry *) hash_search(mapOfDummyShardToPlacement, &key,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
GroupedDummyShards *nodeMappingEntry =
|
||||
(GroupedDummyShards *) hash_search(mapOfDummyShardToPlacement, &key,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
if (!found)
|
||||
{
|
||||
nodeMappingEntry->shardSplitInfoList = NIL;
|
||||
nodeMappingEntry->shardIntervals = NIL;
|
||||
}
|
||||
|
||||
nodeMappingEntry->shardSplitInfoList =
|
||||
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
|
||||
nodeMappingEntry->shardIntervals =
|
||||
lappend(nodeMappingEntry->shardIntervals, shardInterval);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1858,8 +1839,8 @@ DropDummyShards(HTAB *mapOfDummyShardToPlacement)
|
|||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, mapOfDummyShardToPlacement);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
GroupedDummyShards *entry = NULL;
|
||||
while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32 nodeId = entry->key.nodeId;
|
||||
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
|
||||
|
@ -1874,7 +1855,7 @@ DropDummyShards(HTAB *mapOfDummyShardToPlacement)
|
|||
CurrentUserName(),
|
||||
NULL /* databaseName */);
|
||||
|
||||
List *dummyShardIntervalList = entry->shardSplitInfoList;
|
||||
List *dummyShardIntervalList = entry->shardIntervals;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
foreach_ptr(shardInterval, dummyShardIntervalList)
|
||||
{
|
||||
|
@ -1911,51 +1892,27 @@ DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval)
|
|||
|
||||
|
||||
/*
|
||||
* CreateReplicaIdentities creates replica indentities for split children and dummy shards.
|
||||
* CreateReplicaIdentitiesForDummyShards creates replica indentities for split
|
||||
* dummy shards.
|
||||
*/
|
||||
static void
|
||||
CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement,
|
||||
List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList)
|
||||
CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement)
|
||||
{
|
||||
/*
|
||||
* Create Replica Identities for actual child shards.
|
||||
*/
|
||||
List *shardIntervalList = NIL;
|
||||
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
WorkerNode *workerPlacementNode = NULL;
|
||||
|
||||
/*
|
||||
* Iterate on split shard interval list for given shard and create tasks
|
||||
* for every single split shard in a shard group.
|
||||
*/
|
||||
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
||||
workersForPlacementList)
|
||||
{
|
||||
List *shardList = NIL;
|
||||
shardList = lappend(shardList, shardInterval);
|
||||
|
||||
CreateReplicaIdentity(shardList, workerPlacementNode->workerName,
|
||||
workerPlacementNode->workerPort);
|
||||
}
|
||||
}
|
||||
|
||||
/* Create Replica Identities for dummy shards */
|
||||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, mapOfDummyShardToPlacement);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
GroupedDummyShards *entry = NULL;
|
||||
while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32 nodeId = entry->key.nodeId;
|
||||
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
|
||||
false /* missingOk */);
|
||||
|
||||
List *dummyShardIntervalList = entry->shardSplitInfoList;
|
||||
CreateReplicaIdentity(dummyShardIntervalList, shardToBeDroppedNode->workerName,
|
||||
shardToBeDroppedNode->workerPort);
|
||||
List *dummyShardIntervalList = entry->shardIntervals;
|
||||
CreateReplicaIdentitiesOnNode(dummyShardIntervalList,
|
||||
shardToBeDroppedNode->workerName,
|
||||
shardToBeDroppedNode->workerPort);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include "postmaster/postmaster.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/shardsplit_shared_memory.h"
|
||||
|
@ -104,7 +105,7 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
/* SetupMap */
|
||||
ShardInfoHashMap = SetupHashMapForShardInfo();
|
||||
ShardInfoHashMap = CreateSimpleHash(NodeAndOwner, GroupedShardSplitInfos);
|
||||
|
||||
int shardSplitInfoCount = 0;
|
||||
|
||||
|
@ -154,30 +155,6 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupHashMapForShardInfo initializes a hash map to store shard split
|
||||
* information by grouping them node id wise. The key of the hash table
|
||||
* is 'nodeId' and value is a list of ShardSplitInfo that are placed on
|
||||
* this particular node.
|
||||
*/
|
||||
HTAB *
|
||||
SetupHashMapForShardInfo()
|
||||
{
|
||||
HASHCTL info;
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(NodeShardMappingKey);
|
||||
info.entrysize = sizeof(NodeShardMappingEntry);
|
||||
info.hash = NodeShardMappingHash;
|
||||
info.match = NodeShardMappingHashCompare;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
|
||||
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE);
|
||||
|
||||
HTAB *shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
|
||||
return shardInfoMap;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardSplitInfo function constructs ShardSplitInfo data structure
|
||||
* with appropriate OIs' for source and destination relation.
|
||||
|
@ -267,21 +244,21 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
|
|||
static void
|
||||
AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
||||
{
|
||||
NodeShardMappingKey key;
|
||||
NodeAndOwner key;
|
||||
key.nodeId = shardSplitInfo->nodeId;
|
||||
key.tableOwnerId = TableOwnerOid(shardSplitInfo->distributedTableOid);
|
||||
|
||||
bool found = false;
|
||||
NodeShardMappingEntry *nodeMappingEntry =
|
||||
(NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &key, HASH_ENTER,
|
||||
&found);
|
||||
GroupedShardSplitInfos *groupedInfos =
|
||||
(GroupedShardSplitInfos *) hash_search(ShardInfoHashMap, &key, HASH_ENTER,
|
||||
&found);
|
||||
if (!found)
|
||||
{
|
||||
nodeMappingEntry->shardSplitInfoList = NIL;
|
||||
groupedInfos->shardSplitInfoList = NIL;
|
||||
}
|
||||
|
||||
nodeMappingEntry->shardSplitInfoList =
|
||||
lappend(nodeMappingEntry->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo);
|
||||
groupedInfos->shardSplitInfoList =
|
||||
lappend(groupedInfos->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo);
|
||||
}
|
||||
|
||||
|
||||
|
@ -298,14 +275,13 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
|
|||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, ShardInfoHashMap);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
GroupedShardSplitInfos *entry = NULL;
|
||||
int splitInfoIndex = 0;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
while ((entry = (GroupedShardSplitInfos *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32_t nodeId = entry->key.nodeId;
|
||||
uint32_t tableOwnerId = entry->key.tableOwnerId;
|
||||
char *derivedSlotName =
|
||||
EncodeReplicationSlot(nodeId, tableOwnerId);
|
||||
char *derivedSlotName = ReplicationSlotName(SHARD_SPLIT, nodeId, tableOwnerId);
|
||||
|
||||
List *shardSplitInfoList = entry->shardSplitInfoList;
|
||||
ShardSplitInfo *splitShardInfo = NULL;
|
||||
|
@ -321,41 +297,6 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeShardMappingHash returns hash value by combining hash of node id
|
||||
* and tableowner Id.
|
||||
*/
|
||||
uint32
|
||||
NodeShardMappingHash(const void *key, Size keysize)
|
||||
{
|
||||
NodeShardMappingKey *entry = (NodeShardMappingKey *) key;
|
||||
uint32 hash = hash_uint32(entry->nodeId);
|
||||
hash = hash_combine(hash, hash_uint32(entry->tableOwnerId));
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Comparator function for hash keys
|
||||
*/
|
||||
int
|
||||
NodeShardMappingHashCompare(const void *left, const void *right, Size keysize)
|
||||
{
|
||||
NodeShardMappingKey *leftKey = (NodeShardMappingKey *) left;
|
||||
NodeShardMappingKey *rightKey = (NodeShardMappingKey *) right;
|
||||
|
||||
if (leftKey->nodeId != rightKey->nodeId ||
|
||||
leftKey->tableOwnerId != rightKey->tableOwnerId)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ParseShardSplitInfoFromDatum deserializes individual fields of 'pg_catalog.split_shard_info'
|
||||
* datatype.
|
||||
|
@ -434,8 +375,8 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc
|
|||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, ShardInfoHashMap);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
GroupedShardSplitInfos *entry = NULL;
|
||||
while ((entry = (GroupedShardSplitInfos *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
Datum values[3];
|
||||
bool nulls[3];
|
||||
|
@ -448,8 +389,8 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc
|
|||
char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false);
|
||||
values[1] = CStringGetTextDatum(tableOwnerName);
|
||||
|
||||
char *slotName = EncodeReplicationSlot(entry->key.nodeId,
|
||||
entry->key.tableOwnerId);
|
||||
char *slotName = ReplicationSlotName(SHARD_SPLIT, entry->key.nodeId,
|
||||
entry->key.tableOwnerId);
|
||||
values[2] = CStringGetTextDatum(slotName);
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -13,6 +13,7 @@
|
|||
#include "miscadmin.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
|
@ -33,19 +34,9 @@ static HTAB *ShardInfoHashMapForPublications = NULL;
|
|||
static void AddPublishableShardEntryInMap(uint32 targetNodeId,
|
||||
ShardInterval *shardInterval, bool
|
||||
isChildShardInterval);
|
||||
ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32
|
||||
nodeId,
|
||||
List *
|
||||
replicationSlotInfoList);
|
||||
static void CreateShardSplitPublicationForNode(MultiConnection *connection,
|
||||
List *shardList,
|
||||
uint32_t publicationForTargetNodeId, Oid
|
||||
tableOwner);
|
||||
static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
|
||||
static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection);
|
||||
static void DropAllShardSplitPublications(MultiConnection *cleanupConnection);
|
||||
static void DropAllShardSplitUsers(MultiConnection *cleanupConnection);
|
||||
static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection);
|
||||
static LogicalRepTarget * CreateLogicalRepTarget(Oid tableOwnerId,
|
||||
uint32 nodeId,
|
||||
List *replicationSlotInfoList);
|
||||
|
||||
/*
|
||||
* CreateShardSplitInfoMapForPublication creates a hashmap that groups
|
||||
|
@ -74,7 +65,7 @@ CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList,
|
|||
List *shardGroupSplitIntervalListList,
|
||||
List *destinationWorkerNodesList)
|
||||
{
|
||||
ShardInfoHashMapForPublications = SetupHashMapForShardInfo();
|
||||
ShardInfoHashMapForPublications = CreateSimpleHash(NodeAndOwner, PublicationInfo);
|
||||
ShardInterval *sourceShardIntervalToCopy = NULL;
|
||||
List *splitChildShardIntervalList = NULL;
|
||||
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
|
||||
|
@ -129,28 +120,29 @@ static void
|
|||
AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool
|
||||
isChildShardInterval)
|
||||
{
|
||||
NodeShardMappingKey key;
|
||||
NodeAndOwner key;
|
||||
key.nodeId = targetNodeId;
|
||||
key.tableOwnerId = TableOwnerOid(shardInterval->relationId);
|
||||
|
||||
bool found = false;
|
||||
NodeShardMappingEntry *nodeMappingEntry =
|
||||
(NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
PublicationInfo *publicationInfo =
|
||||
(PublicationInfo *) hash_search(ShardInfoHashMapForPublications, &key,
|
||||
HASH_ENTER,
|
||||
&found);
|
||||
|
||||
/* Create a new list for <nodeId, owner> pair */
|
||||
if (!found)
|
||||
{
|
||||
nodeMappingEntry->shardSplitInfoList = NIL;
|
||||
publicationInfo->shardIntervals = NIL;
|
||||
publicationInfo->name = PublicationName(SHARD_SPLIT, key.nodeId,
|
||||
key.tableOwnerId);
|
||||
}
|
||||
|
||||
/* Add child shard interval */
|
||||
if (isChildShardInterval)
|
||||
{
|
||||
nodeMappingEntry->shardSplitInfoList =
|
||||
lappend(nodeMappingEntry->shardSplitInfoList,
|
||||
(ShardInterval *) shardInterval);
|
||||
publicationInfo->shardIntervals =
|
||||
lappend(publicationInfo->shardIntervals, shardInterval);
|
||||
|
||||
/* We return from here as the child interval is only added once in the list */
|
||||
return;
|
||||
|
@ -158,7 +150,7 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
|
|||
|
||||
/* Check if parent is already added */
|
||||
ShardInterval *existingShardInterval = NULL;
|
||||
foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList)
|
||||
foreach_ptr(existingShardInterval, publicationInfo->shardIntervals)
|
||||
{
|
||||
if (existingShardInterval->shardId == shardInterval->shardId)
|
||||
{
|
||||
|
@ -168,127 +160,13 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
|
|||
}
|
||||
|
||||
/* Add parent shard Interval */
|
||||
nodeMappingEntry->shardSplitInfoList =
|
||||
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
|
||||
publicationInfo->shardIntervals =
|
||||
lappend(publicationInfo->shardIntervals, shardInterval);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardSplitPublications creates publications on the source node.
|
||||
*
|
||||
* sourceConnection - Connection of source node.
|
||||
*
|
||||
* shardInfoHashMapForPublication - ShardIntervals are grouped by <owner, nodeId> key.
|
||||
* A publication is created for list of
|
||||
* ShardIntervals mapped by key.
|
||||
*/
|
||||
void
|
||||
CreateShardSplitPublications(MultiConnection *sourceConnection,
|
||||
HTAB *shardInfoHashMapForPublication)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, shardInfoHashMapForPublication);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32 nodeId = entry->key.nodeId;
|
||||
uint32 tableOwnerId = entry->key.tableOwnerId;
|
||||
List *shardListForPublication = entry->shardSplitInfoList;
|
||||
|
||||
/* Create publication on shard list */
|
||||
CreateShardSplitPublicationForNode(sourceConnection,
|
||||
shardListForPublication,
|
||||
nodeId,
|
||||
tableOwnerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardSplitPublicationForNode creates a publication on source node
|
||||
* for given shard list.
|
||||
* We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications
|
||||
* related to split operations.
|
||||
*/
|
||||
static void
|
||||
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
|
||||
uint32_t publicationForTargetNodeId, Oid ownerId)
|
||||
{
|
||||
StringInfo createPublicationCommand = makeStringInfo();
|
||||
bool prefixWithComma = false;
|
||||
|
||||
appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ",
|
||||
ShardSplitPublicationName(publicationForTargetNodeId, ownerId));
|
||||
|
||||
ShardInterval *shard = NULL;
|
||||
foreach_ptr(shard, shardList)
|
||||
{
|
||||
char *shardName = ConstructQualifiedShardName(shard);
|
||||
|
||||
if (prefixWithComma)
|
||||
{
|
||||
appendStringInfoString(createPublicationCommand, ",");
|
||||
}
|
||||
|
||||
appendStringInfoString(createPublicationCommand, shardName);
|
||||
prefixWithComma = true;
|
||||
}
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
|
||||
pfree(createPublicationCommand->data);
|
||||
pfree(createPublicationCommand);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardSplitPublicationName returns publication name for Shard Split operations.
|
||||
*/
|
||||
static char *
|
||||
ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
|
||||
{
|
||||
return psprintf("%s%u_%u", SHARD_SPLIT_PUBLICATION_PREFIX, nodeId, ownerId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTargetNodeConnectionsForShardSplit creates connections on target nodes.
|
||||
* These connections are used for subscription managment. They are closed
|
||||
* at the end of non-blocking split workflow.
|
||||
*/
|
||||
List *
|
||||
CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int
|
||||
connectionFlags, char *user, char *databaseName)
|
||||
{
|
||||
List *targetNodeConnectionList = NIL;
|
||||
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = NULL;
|
||||
foreach_ptr(shardSplitSubscriberMetadata, shardSplitSubscribersMetadataList)
|
||||
{
|
||||
/* slotinfo is expected to be already populated */
|
||||
Assert(shardSplitSubscriberMetadata->slotInfo != NULL);
|
||||
|
||||
uint32 targetWorkerNodeId = shardSplitSubscriberMetadata->slotInfo->targetNodeId;
|
||||
WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false);
|
||||
|
||||
MultiConnection *targetConnection =
|
||||
GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName,
|
||||
targetWorkerNode->workerPort,
|
||||
user,
|
||||
databaseName);
|
||||
ClaimConnectionExclusively(targetConnection);
|
||||
|
||||
targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection);
|
||||
|
||||
/* Cache the connections for each subscription */
|
||||
shardSplitSubscriberMetadata->targetNodeConnection = targetConnection;
|
||||
}
|
||||
|
||||
return targetNodeConnectionList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PopulateShardSplitSubscriptionsMetadataList returns a list of 'ShardSplitSubscriberMetadata'
|
||||
* PopulateShardSplitSubscriptionsMetadataList returns a list of 'LogicalRepTarget'
|
||||
* structure.
|
||||
*
|
||||
* shardSplitInfoHashMap - Shards are grouped by <owner, node id> key.
|
||||
|
@ -299,483 +177,98 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList
|
|||
*/
|
||||
List *
|
||||
PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
|
||||
List *replicationSlotInfoList)
|
||||
List *replicationSlotInfoList,
|
||||
List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, shardSplitInfoHashMap);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
List *shardSplitSubscriptionMetadataList = NIL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
PublicationInfo *publication = NULL;
|
||||
List *logicalRepTargetList = NIL;
|
||||
while ((publication = (PublicationInfo *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32 nodeId = entry->key.nodeId;
|
||||
uint32 tableOwnerId = entry->key.tableOwnerId;
|
||||
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata =
|
||||
CreateShardSplitSubscriberMetadata(tableOwnerId, nodeId,
|
||||
replicationSlotInfoList);
|
||||
uint32 nodeId = publication->key.nodeId;
|
||||
uint32 tableOwnerId = publication->key.tableOwnerId;
|
||||
LogicalRepTarget *target =
|
||||
CreateLogicalRepTarget(tableOwnerId, nodeId,
|
||||
replicationSlotInfoList);
|
||||
target->publication = publication;
|
||||
publication->target = target;
|
||||
|
||||
shardSplitSubscriptionMetadataList = lappend(shardSplitSubscriptionMetadataList,
|
||||
shardSplitSubscriberMetadata);
|
||||
logicalRepTargetList = lappend(logicalRepTargetList, target);
|
||||
}
|
||||
|
||||
return shardSplitSubscriptionMetadataList;
|
||||
List *shardIntervalList = NIL;
|
||||
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
WorkerNode *workerPlacementNode = NULL;
|
||||
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
||||
workersForPlacementList)
|
||||
{
|
||||
NodeAndOwner key;
|
||||
key.nodeId = workerPlacementNode->nodeId;
|
||||
key.tableOwnerId = TableOwnerOid(shardInterval->relationId);
|
||||
|
||||
bool found = false;
|
||||
publication = (PublicationInfo *) hash_search(
|
||||
ShardInfoHashMapForPublications,
|
||||
&key,
|
||||
HASH_FIND,
|
||||
&found);
|
||||
if (!found)
|
||||
{
|
||||
ereport(ERROR, errmsg("Could not find publication matching a split"));
|
||||
}
|
||||
publication->target->newShards = lappend(
|
||||
publication->target->newShards, shardInterval);
|
||||
}
|
||||
}
|
||||
|
||||
return logicalRepTargetList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Creates a 'ShardSplitSubscriberMetadata' structure for given table owner, node id.
|
||||
* Creates a 'LogicalRepTarget' structure for given table owner, node id.
|
||||
* It scans the list of 'ReplicationSlotInfo' to identify the corresponding slot
|
||||
* to be used for given tableOwnerId and nodeId.
|
||||
*/
|
||||
ShardSplitSubscriberMetadata *
|
||||
CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
|
||||
List *replicationSlotInfoList)
|
||||
static LogicalRepTarget *
|
||||
CreateLogicalRepTarget(Oid tableOwnerId, uint32 nodeId,
|
||||
List *replicationSlotInfoList)
|
||||
{
|
||||
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0(
|
||||
sizeof(ShardSplitSubscriberMetadata));
|
||||
shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId;
|
||||
LogicalRepTarget *target = palloc0(sizeof(LogicalRepTarget));
|
||||
target->subscriptionName = SubscriptionName(SHARD_SPLIT, tableOwnerId);
|
||||
target->tableOwnerId = tableOwnerId;
|
||||
target->subscriptionOwnerName =
|
||||
SubscriptionRoleName(SHARD_SPLIT, tableOwnerId);
|
||||
target->superuserConnection = NULL;
|
||||
|
||||
/*
|
||||
* Each 'ReplicationSlotInfo' belongs to a unique combination of node id and owner.
|
||||
* Traverse the slot list to identify the corresponding slot for given
|
||||
* table owner and node.
|
||||
*/
|
||||
char *tableOwnerName = GetUserNameFromId(tableOwnerId, false);
|
||||
ReplicationSlotInfo *replicationSlotInfo = NULL;
|
||||
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
|
||||
ReplicationSlotInfo *replicationSlot = NULL;
|
||||
foreach_ptr(replicationSlot, replicationSlotInfoList)
|
||||
{
|
||||
if (nodeId == replicationSlotInfo->targetNodeId &&
|
||||
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
|
||||
if (nodeId == replicationSlot->targetNodeId &&
|
||||
tableOwnerId == replicationSlot->tableOwnerId)
|
||||
{
|
||||
shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo;
|
||||
target->replicationSlot = replicationSlot;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return shardSplitSubscriberMetadata;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardSplitSubscriptions creates subscriptions for Shard Split operation.
|
||||
* We follow Shard Split naming scheme for Publication-Subscription management.
|
||||
*
|
||||
* targetNodeConnectionList - List of connections to target nodes on which
|
||||
* subscriptions have to be created.
|
||||
*
|
||||
* shardSplitSubscriberMetadataList - List of subscriber metadata.
|
||||
*
|
||||
* sourceWorkerNode - Source node.
|
||||
*/
|
||||
void
|
||||
CreateShardSplitSubscriptions(List *targetNodeConnectionList,
|
||||
List *shardSplitSubscriberMetadataList,
|
||||
WorkerNode *sourceWorkerNode,
|
||||
char *superUser,
|
||||
char *databaseName)
|
||||
{
|
||||
MultiConnection *targetConnection = NULL;
|
||||
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
|
||||
forboth_ptr(targetConnection, targetNodeConnectionList,
|
||||
shardSplitPubSubMetadata, shardSplitSubscriberMetadataList)
|
||||
if (!target->replicationSlot)
|
||||
{
|
||||
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
|
||||
Oid ownerId = shardSplitPubSubMetadata->tableOwnerId;
|
||||
CreateShardSplitSubscription(targetConnection,
|
||||
sourceWorkerNode->workerName,
|
||||
sourceWorkerNode->workerPort,
|
||||
superUser,
|
||||
databaseName,
|
||||
ShardSplitPublicationName(publicationForNodeId,
|
||||
ownerId),
|
||||
shardSplitPubSubMetadata->slotInfo->slotName,
|
||||
ownerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WaitForShardSplitRelationSubscriptionsBecomeReady waits for a list of subscriptions
|
||||
* to be come ready. This method invokes 'WaitForRelationSubscriptionsBecomeReady' for each
|
||||
* subscription.
|
||||
*/
|
||||
void
|
||||
WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList)
|
||||
{
|
||||
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
|
||||
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
|
||||
{
|
||||
Bitmapset *tableOwnerIds = bms_make_singleton(
|
||||
shardSplitPubSubMetadata->tableOwnerId);
|
||||
WaitForRelationSubscriptionsBecomeReady(
|
||||
shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds,
|
||||
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WaitForShardSplitRelationSubscriptionsToBeCaughtUp waits until subscriptions are caught up till
|
||||
* the source LSN. This method invokes 'WaitForShardSubscriptionToCatchUp' for each subscription.
|
||||
*/
|
||||
void
|
||||
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
|
||||
List *shardSplitPubSubMetadataList)
|
||||
{
|
||||
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
|
||||
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
|
||||
{
|
||||
Bitmapset *tableOwnerIds = bms_make_singleton(
|
||||
shardSplitPubSubMetadata->tableOwnerId);
|
||||
WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection,
|
||||
sourcePosition,
|
||||
tableOwnerIds,
|
||||
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTemplateReplicationSlot creates a replication slot that acts as a template
|
||||
* slot for logically replicating split children in the 'catchup' phase of non-blocking split.
|
||||
* It returns a snapshot name which is used to do snapshotted shard copy in the 'copy' phase
|
||||
* of nonblocking split workflow.
|
||||
*/
|
||||
char *
|
||||
CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
|
||||
MultiConnection *sourceConnection)
|
||||
{
|
||||
StringInfo createReplicationSlotCommand = makeStringInfo();
|
||||
appendStringInfo(createReplicationSlotCommand,
|
||||
"CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;",
|
||||
ShardSplitTemplateReplicationSlotName(
|
||||
shardIntervalToSplit->shardId));
|
||||
|
||||
PGresult *result = NULL;
|
||||
int response = ExecuteOptionalRemoteCommand(sourceConnection,
|
||||
createReplicationSlotCommand->data,
|
||||
&result);
|
||||
|
||||
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
|
||||
{
|
||||
ReportResultError(sourceConnection, result, ERROR);
|
||||
ereport(ERROR, errmsg(
|
||||
"Could not find replication slot matching a subscription %s",
|
||||
target->subscriptionName));
|
||||
}
|
||||
|
||||
/*'snapshot_name' is second column where index starts from zero.
|
||||
* We're using the pstrdup to copy the data into the current memory context */
|
||||
char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */));
|
||||
return snapShotName;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardSplitTemplateReplicationSlotName returns name of template replication slot
|
||||
* following the shard split naming scheme.
|
||||
*/
|
||||
char *
|
||||
ShardSplitTemplateReplicationSlotName(uint64 shardId)
|
||||
{
|
||||
return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateReplicationSlots creates copies of template replication slot
|
||||
* on the source node.
|
||||
*
|
||||
* sourceNodeConnection - Source node connection.
|
||||
*
|
||||
* templateSlotName - Template replication slot name whose copies have to be created.
|
||||
* This slot holds a LSN from which the logical replication
|
||||
* begins.
|
||||
*
|
||||
* shardSplitSubscriberMetadataList - List of 'ShardSplitSubscriberMetadata. '
|
||||
*
|
||||
* 'ShardSplitSubscriberMetadata' contains replication slot name that is used
|
||||
* to create copies of template replication slot on source node. These slot names are returned by
|
||||
* 'worker_split_shard_replication_setup' UDF and each slot is responsible for a specific
|
||||
* split range. We try multiple attemtps to clean up these replicaton slot copies in the
|
||||
* below order to be on safer side.
|
||||
* 1. Clean up before starting shard split workflow.
|
||||
* 2. Implicitly dropping slots while dropping subscriptions.
|
||||
* 3. Explicitly dropping slots which would have skipped over from 2.
|
||||
*/
|
||||
void
|
||||
CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlotName,
|
||||
List *shardSplitSubscriberMetadataList)
|
||||
{
|
||||
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
|
||||
foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList)
|
||||
{
|
||||
char *slotName = subscriberMetadata->slotInfo->slotName;
|
||||
|
||||
StringInfo createReplicationSlotCommand = makeStringInfo();
|
||||
|
||||
appendStringInfo(createReplicationSlotCommand,
|
||||
"SELECT * FROM pg_catalog.pg_copy_logical_replication_slot (%s, %s)",
|
||||
quote_literal_cstr(templateSlotName), quote_literal_cstr(
|
||||
slotName));
|
||||
|
||||
ExecuteCriticalRemoteCommand(sourceNodeConnection,
|
||||
createReplicationSlotCommand->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles
|
||||
* and replication slots. These might have been left there after
|
||||
* the coordinator crashed during a shard split. It's important to delete them
|
||||
* for two reasons:
|
||||
* 1. Starting new shard split might fail when they exist, because it cannot
|
||||
* create them.
|
||||
* 2. Leftover replication slots that are not consumed from anymore make it
|
||||
* impossible for WAL to be dropped. This can cause out-of-disk issues.
|
||||
*/
|
||||
void
|
||||
DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub)
|
||||
{
|
||||
char *superUser = CitusExtensionOwnerName();
|
||||
char *databaseName = get_database_name(MyDatabaseId);
|
||||
|
||||
/*
|
||||
* We open new connections to all nodes. The reason for this is that
|
||||
* operations on subscriptions and publications cannot be run in a
|
||||
* transaction. By forcing a new connection we make sure no transaction is
|
||||
* active on the connection.
|
||||
*/
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
|
||||
HASH_SEQ_STATUS statusForSubscription;
|
||||
hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) !=
|
||||
NULL)
|
||||
{
|
||||
uint32_t nodeId = entry->key.nodeId;
|
||||
WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/);
|
||||
|
||||
MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection(
|
||||
connectionFlags, workerNode->workerName, workerNode->workerPort,
|
||||
superUser, databaseName);
|
||||
|
||||
/* We need to claim the connection exclusively while dropping the subscription */
|
||||
ClaimConnectionExclusively(cleanupConnection);
|
||||
|
||||
DropAllShardSplitSubscriptions(cleanupConnection);
|
||||
|
||||
DropAllShardSplitUsers(cleanupConnection);
|
||||
|
||||
/* Close connection after cleanup */
|
||||
CloseConnection(cleanupConnection);
|
||||
}
|
||||
|
||||
/*Drop all shard split publications at the source*/
|
||||
MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection(
|
||||
connectionFlags, sourceNode->workerName, sourceNode->workerPort,
|
||||
superUser, databaseName);
|
||||
|
||||
ClaimConnectionExclusively(sourceNodeConnection);
|
||||
|
||||
/*
|
||||
* If replication slot could not be dropped while dropping the
|
||||
* subscriber, drop it here.
|
||||
*/
|
||||
DropAllShardSplitReplicationSlots(sourceNodeConnection);
|
||||
DropAllShardSplitPublications(sourceNodeConnection);
|
||||
|
||||
CloseConnection(sourceNodeConnection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropAllShardSplitSubscriptions drops all the existing subscriptions that
|
||||
* match our shard split naming scheme on the node that the connection points
|
||||
* to.
|
||||
*/
|
||||
void
|
||||
DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
|
||||
{
|
||||
char *query = psprintf(
|
||||
"SELECT subname FROM pg_catalog.pg_subscription "
|
||||
"WHERE subname LIKE %s || '%%'",
|
||||
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX));
|
||||
List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query);
|
||||
char *subscriptionName = NULL;
|
||||
foreach_ptr(subscriptionName, subscriptionNameList)
|
||||
{
|
||||
DisableAndDropShardSplitSubscription(cleanupConnection, subscriptionName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropAllShardSplitPublications drops all the existing publications that
|
||||
* match our shard split naming scheme on the node that the connection points
|
||||
* to.
|
||||
*/
|
||||
static void
|
||||
DropAllShardSplitPublications(MultiConnection *connection)
|
||||
{
|
||||
char *query = psprintf(
|
||||
"SELECT pubname FROM pg_catalog.pg_publication "
|
||||
"WHERE pubname LIKE %s || '%%'",
|
||||
quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX));
|
||||
List *publicationNameList = GetQueryResultStringList(connection, query);
|
||||
char *publicationName;
|
||||
foreach_ptr(publicationName, publicationNameList)
|
||||
{
|
||||
DropShardPublication(connection, publicationName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropAllShardSplitUsers drops all the users that match our shard split naming
|
||||
* scheme. The users are temporary created for shard splits.
|
||||
*/
|
||||
static void
|
||||
DropAllShardSplitUsers(MultiConnection *connection)
|
||||
{
|
||||
char *query = psprintf(
|
||||
"SELECT rolname FROM pg_catalog.pg_roles "
|
||||
"WHERE rolname LIKE %s || '%%'",
|
||||
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX));
|
||||
List *usernameList = GetQueryResultStringList(connection, query);
|
||||
char *username;
|
||||
foreach_ptr(username, usernameList)
|
||||
{
|
||||
DropShardUser(connection, username);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropAllShardSplitReplicationSlots drops all the existing replication slots
|
||||
* that match shard split naming scheme on the node that the connection
|
||||
* points to.
|
||||
*/
|
||||
static void
|
||||
DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection)
|
||||
{
|
||||
char *query = psprintf(
|
||||
"SELECT slot_name FROM pg_catalog.pg_replication_slots "
|
||||
"WHERE slot_name LIKE %s || '%%'",
|
||||
quote_literal_cstr(SHARD_SPLIT_REPLICATION_SLOT_PREFIX));
|
||||
List *slotNameList = GetQueryResultStringList(cleanupConnection, query);
|
||||
char *slotName;
|
||||
foreach_ptr(slotName, slotNameList)
|
||||
{
|
||||
DropShardReplicationSlot(cleanupConnection, slotName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropShardSplitPublications drops the publication used for shard splits over the given
|
||||
* connection, if it exists.
|
||||
*/
|
||||
void
|
||||
DropShardSplitPublications(MultiConnection *sourceConnection,
|
||||
HTAB *shardInfoHashMapForPublication)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
hash_seq_init(&status, shardInfoHashMapForPublication);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32 nodeId = entry->key.nodeId;
|
||||
uint32 tableOwnerId = entry->key.tableOwnerId;
|
||||
DropShardPublication(sourceConnection, ShardSplitPublicationName(nodeId,
|
||||
tableOwnerId));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropShardSplitSubsriptions disables and drops subscriptions from the subscriber node that
|
||||
* are used to split shards. Note that, it does not drop the replication slots on the publisher node.
|
||||
* Replication slots will be dropped separately by calling DropShardSplitReplicationSlots.
|
||||
*/
|
||||
void
|
||||
DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList)
|
||||
{
|
||||
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
|
||||
foreach_ptr(subscriberMetadata, shardSplitSubscribersMetadataList)
|
||||
{
|
||||
uint32 tableOwnerId = subscriberMetadata->tableOwnerId;
|
||||
MultiConnection *targetNodeConnection = subscriberMetadata->targetNodeConnection;
|
||||
|
||||
DisableAndDropShardSplitSubscription(targetNodeConnection, ShardSubscriptionName(
|
||||
tableOwnerId,
|
||||
SHARD_SPLIT_SUBSCRIPTION_PREFIX));
|
||||
|
||||
DropShardUser(targetNodeConnection, ShardSubscriptionRole(tableOwnerId,
|
||||
SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DisableAndDropShardSplitSubscription disables the subscription, resets the slot name to 'none' and
|
||||
* then drops subscription on the given connection. It does not drop the replication slot.
|
||||
* The caller of this method should ensure to cleanup the replication slot.
|
||||
*
|
||||
* Directly executing 'DROP SUBSCRIPTION' attempts to drop the replication slot at the source node.
|
||||
* When the subscription is local, direcly dropping the subscription can lead to a self deadlock.
|
||||
* To avoid this, we first disable the subscription, reset the slot name and then drop the subscription.
|
||||
*/
|
||||
void
|
||||
DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscriptionName)
|
||||
{
|
||||
StringInfo alterSubscriptionSlotCommand = makeStringInfo();
|
||||
StringInfo alterSubscriptionDisableCommand = makeStringInfo();
|
||||
|
||||
appendStringInfo(alterSubscriptionDisableCommand,
|
||||
"ALTER SUBSCRIPTION %s DISABLE",
|
||||
quote_identifier(subscriptionName));
|
||||
ExecuteCriticalRemoteCommand(connection,
|
||||
alterSubscriptionDisableCommand->data);
|
||||
|
||||
appendStringInfo(alterSubscriptionSlotCommand,
|
||||
"ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
|
||||
quote_identifier(subscriptionName));
|
||||
ExecuteCriticalRemoteCommand(connection, alterSubscriptionSlotCommand->data);
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, psprintf(
|
||||
"DROP SUBSCRIPTION %s",
|
||||
quote_identifier(subscriptionName)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropShardSplitReplicationSlots drops replication slots on the source node.
|
||||
*/
|
||||
void
|
||||
DropShardSplitReplicationSlots(MultiConnection *sourceConnection,
|
||||
List *replicationSlotInfoList)
|
||||
{
|
||||
ReplicationSlotInfo *replicationSlotInfo = NULL;
|
||||
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
|
||||
{
|
||||
DropShardReplicationSlot(sourceConnection, replicationSlotInfo->slotName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CloseShardSplitSubscriberConnections closes connection of subscriber nodes.
|
||||
* 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method
|
||||
* traverses the list and closes each connection.
|
||||
*/
|
||||
void
|
||||
CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList)
|
||||
{
|
||||
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
|
||||
foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList)
|
||||
{
|
||||
CloseConnection(subscriberMetadata->targetNodeConnection);
|
||||
}
|
||||
return target;
|
||||
}
|
||||
|
|
|
@ -187,32 +187,6 @@ ReleaseSharedMemoryOfShardSplitInfo()
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* EncodeReplicationSlot returns an encoded replication slot name
|
||||
* in the following format.
|
||||
* Slot Name = citus_split_nodeId_tableOwnerOid
|
||||
* Max supported length of replication slot name is 64 bytes.
|
||||
*/
|
||||
char *
|
||||
EncodeReplicationSlot(uint32_t nodeId,
|
||||
uint32_t tableOwnerId)
|
||||
{
|
||||
StringInfo slotName = makeStringInfo();
|
||||
appendStringInfo(slotName, "%s%u_%u", SHARD_SPLIT_REPLICATION_SLOT_PREFIX, nodeId,
|
||||
tableOwnerId);
|
||||
|
||||
if (slotName->len > NAMEDATALEN)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg(
|
||||
"Replication Slot name:%s having length:%d is greater than maximum allowed length:%d",
|
||||
slotName->data, slotName->len, NAMEDATALEN)));
|
||||
}
|
||||
|
||||
return slotName->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeShardSplitSMHandleManagement requests the necessary shared memory
|
||||
* from Postgres and sets up the shared memory startup hook.
|
||||
|
|
|
@ -10,6 +10,8 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "common/hashfn.h"
|
||||
#include "distributed/citus_safe_lib.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
||||
|
@ -34,6 +36,50 @@ hash_delete_all(HTAB *htab)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateSimpleHashWithName creates a hashmap that hashes its key using
|
||||
* tag_hash function and stores the entries in the current memory context.
|
||||
*/
|
||||
HTAB
|
||||
*
|
||||
CreateSimpleHashWithName(Size keySize, Size entrySize, char *name)
|
||||
{
|
||||
HASHCTL info;
|
||||
memset_struct_0(info);
|
||||
info.keysize = keySize;
|
||||
info.entrysize = entrySize;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
|
||||
/*
|
||||
* uint32_hash does the same as tag_hash for keys of 4 bytes, but it's
|
||||
* faster.
|
||||
*/
|
||||
if (keySize == sizeof(uint32))
|
||||
{
|
||||
info.hash = uint32_hash;
|
||||
}
|
||||
else
|
||||
{
|
||||
info.hash = tag_hash;
|
||||
}
|
||||
|
||||
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||
|
||||
/*
|
||||
* We use 32 as the initial number of elements that fit into this hash
|
||||
* table. This value seems a reasonable tradeof between two issues:
|
||||
* 1. An empty hashmap shouldn't take up a lot of space
|
||||
* 2. Doing a few inserts shouldn't require growing the hashmap
|
||||
*
|
||||
* NOTE: No performance testing has been performed when choosing this
|
||||
* value. If this ever turns out to be a problem, feel free to do some
|
||||
* performance tests.
|
||||
*/
|
||||
HTAB *publicationInfoHash = hash_create(name, 32, &info, hashFlags);
|
||||
return publicationInfoHash;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* foreach_htab_cleanup cleans up the hash iteration state after the iteration
|
||||
* is done. This is only needed when break statements are present in the
|
||||
|
|
|
@ -30,4 +30,8 @@ extern void hash_delete_all(HTAB *htab);
|
|||
|
||||
extern void foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status);
|
||||
|
||||
extern HTAB * CreateSimpleHashWithName(Size keysize, Size entrysize, char *name);
|
||||
|
||||
#define CreateSimpleHash(keyType, entryType) \
|
||||
CreateSimpleHashWithName(sizeof(keyType), sizeof(entryType), # entryType "Hash")
|
||||
#endif
|
||||
|
|
|
@ -12,8 +12,10 @@
|
|||
#ifndef MULTI_LOGICAL_REPLICATION_H_
|
||||
#define MULTI_LOGICAL_REPLICATION_H_
|
||||
|
||||
#include "c.h"
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
#include "distributed/connection_management.h"
|
||||
|
||||
|
||||
/* Config variables managed via guc.c */
|
||||
|
@ -21,49 +23,157 @@ extern int LogicalReplicationTimeout;
|
|||
|
||||
extern bool PlacementMovedUsingLogicalReplicationInTX;
|
||||
|
||||
/*
|
||||
* NodeAndOwner should be used as a key for structs that should be hashed by a
|
||||
* combination of node and owner.
|
||||
*/
|
||||
typedef struct NodeAndOwner
|
||||
{
|
||||
uint32_t nodeId;
|
||||
Oid tableOwnerId;
|
||||
} NodeAndOwner;
|
||||
|
||||
|
||||
/*
|
||||
* ReplicationSlotInfo stores the info that defines a replication slot. For
|
||||
* shard splits this information is built by parsing the result of the
|
||||
* 'worker_split_shard_replication_setup' UDF.
|
||||
*/
|
||||
typedef struct ReplicationSlotInfo
|
||||
{
|
||||
uint32 targetNodeId;
|
||||
Oid tableOwnerId;
|
||||
char *name;
|
||||
} ReplicationSlotInfo;
|
||||
|
||||
/*
|
||||
* PublicationInfo stores the information that defines a publication.
|
||||
*/
|
||||
typedef struct PublicationInfo
|
||||
{
|
||||
NodeAndOwner key;
|
||||
char *name;
|
||||
List *shardIntervals;
|
||||
struct LogicalRepTarget *target;
|
||||
} PublicationInfo;
|
||||
|
||||
/*
|
||||
* Stores information necesary to create all the th
|
||||
*/
|
||||
typedef struct LogicalRepTarget
|
||||
{
|
||||
/*
|
||||
* The Oid of the user that owns the shards in newShards. This Oid is the
|
||||
* Oid of the user on the coordinator, this Oid is likely different than
|
||||
* the Oid of the user on the logical replication source or target.
|
||||
*/
|
||||
Oid tableOwnerId;
|
||||
char *subscriptionName;
|
||||
|
||||
/*
|
||||
* The name of the user that's used as the owner of the subscription. This
|
||||
* is not the same as the name of the user that matches tableOwnerId.
|
||||
* Instead we create a temporary user with the same permissions as that
|
||||
* user, with its only purpose being owning the subscription.
|
||||
*/
|
||||
char *subscriptionOwnerName;
|
||||
ReplicationSlotInfo *replicationSlot;
|
||||
PublicationInfo *publication;
|
||||
|
||||
/*
|
||||
* The shardIntervals that we want to create on this logical replication
|
||||
* target. This can be different from the shard intervals that are part of
|
||||
* the publication for two reasons:
|
||||
* 1. The publication does not contain partitioned tables, only their
|
||||
* children. The partition parent tables ARE part of newShards.
|
||||
* 2. For shard splits the publication also contains dummy shards, these
|
||||
* ARE NOT part of newShards.
|
||||
*/
|
||||
List *newShards;
|
||||
|
||||
/*
|
||||
* The superuserConnection is shared between all LogicalRepTargets that have
|
||||
* the same node. This can be initialized easily by using
|
||||
* CreateGroupedLogicalRepTargetsConnections.
|
||||
*/
|
||||
MultiConnection *superuserConnection;
|
||||
} LogicalRepTarget;
|
||||
|
||||
/*
|
||||
* GroupedLogicalRepTargets groups LogicalRepTargets by node. This allows to
|
||||
* create a hashmap where we can filter by search by nodeId. Which is useful
|
||||
* because these targets can all use the same superuserConection for
|
||||
* management, which allows us to batch certain operations such as getting
|
||||
* state of the subscriptions.
|
||||
*/
|
||||
typedef struct GroupedLogicalRepTargets
|
||||
{
|
||||
uint32 nodeId;
|
||||
List *logicalRepTargetList;
|
||||
MultiConnection *superuserConnection;
|
||||
} GroupedLogicalRepTargets;
|
||||
|
||||
|
||||
/*
|
||||
* LogicalRepType is used for various functions to do something different for
|
||||
* shard moves than for shard splits. Such as using a different prefix for a
|
||||
* subscription name.
|
||||
*/
|
||||
typedef enum LogicalRepType
|
||||
{
|
||||
SHARD_MOVE,
|
||||
SHARD_SPLIT,
|
||||
} LogicalRepType;
|
||||
|
||||
extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
|
||||
int sourceNodePort, char *targetNodeName,
|
||||
int targetNodePort);
|
||||
|
||||
extern void ConflictOnlyWithIsolationTesting(void);
|
||||
extern void CreateReplicaIdentity(List *shardList, char *nodeName, int32
|
||||
nodePort);
|
||||
extern void CreateReplicaIdentities(List *subscriptionInfoList);
|
||||
extern void CreateReplicaIdentitiesOnNode(List *shardList,
|
||||
char *nodeName,
|
||||
int32 nodePort);
|
||||
extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection);
|
||||
extern List * GetQueryResultStringList(MultiConnection *connection, char *query);
|
||||
|
||||
extern void DropShardSubscription(MultiConnection *connection,
|
||||
char *subscriptionName);
|
||||
extern void DropShardPublication(MultiConnection *connection, char *publicationName);
|
||||
extern MultiConnection * GetReplicationConnection(char *nodeName, int nodePort);
|
||||
extern void CreatePublications(MultiConnection *sourceConnection,
|
||||
HTAB *publicationInfoHash);
|
||||
extern void CreateSubscriptions(MultiConnection *sourceConnection,
|
||||
char *databaseName, List *subscriptionInfoList);
|
||||
extern char * CreateReplicationSlots(MultiConnection *sourceConnection,
|
||||
MultiConnection *sourceReplicationConnection,
|
||||
List *subscriptionInfoList,
|
||||
char *outputPlugin);
|
||||
extern void EnableSubscriptions(List *subscriptionInfoList);
|
||||
extern void DropSubscriptions(List *subscriptionInfoList);
|
||||
extern void DropReplicationSlots(MultiConnection *sourceConnection,
|
||||
List *subscriptionInfoList);
|
||||
extern void DropPublications(MultiConnection *sourceConnection,
|
||||
HTAB *publicationInfoHash);
|
||||
extern void DropAllLogicalReplicationLeftovers(LogicalRepType type);
|
||||
|
||||
extern void DropShardUser(MultiConnection *connection, char *username);
|
||||
extern void DropShardReplicationSlot(MultiConnection *connection,
|
||||
char *publicationName);
|
||||
extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId);
|
||||
extern char * ReplicationSlotName(LogicalRepType type, uint32_t nodeId, Oid ownerId);
|
||||
extern char * SubscriptionName(LogicalRepType type, Oid ownerId);
|
||||
extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId);
|
||||
|
||||
|
||||
extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix);
|
||||
extern char * ShardSubscriptionName(Oid ownerId, char *operationPrefix);
|
||||
extern void CreateShardSplitSubscription(MultiConnection *connection,
|
||||
char *sourceNodeName,
|
||||
int sourceNodePort, char *userName,
|
||||
char *databaseName,
|
||||
char *publicationName, char *slotName,
|
||||
Oid ownerId);
|
||||
|
||||
extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
|
||||
Bitmapset *tableOwnerIds,
|
||||
char *operationPrefix);
|
||||
extern void WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash);
|
||||
extern void WaitForAllSubscriptionsToCatchUp(MultiConnection *sourceConnection,
|
||||
HTAB *groupedLogicalRepTargetsHash);
|
||||
extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection,
|
||||
XLogRecPtr sourcePosition,
|
||||
Bitmapset *tableOwnerIds,
|
||||
char *operationPrefix);
|
||||
extern HTAB * CreateGroupedLogicalRepTargetsHash(List *subscriptionInfoList);
|
||||
extern void CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
|
||||
char *user,
|
||||
char *databaseName);
|
||||
extern void RecreateGroupedLogicalRepTargetsConnections(
|
||||
HTAB *groupedLogicalRepTargetsHash,
|
||||
char *user,
|
||||
char *databaseName);
|
||||
extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash);
|
||||
|
||||
#define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_"
|
||||
#define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_"
|
||||
#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_"
|
||||
#define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_"
|
||||
#define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_"
|
||||
#define SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX "citus_shard_split_subscription_role_"
|
||||
#define SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX "citus_shard_split_template_slot_"
|
||||
#define SHARD_SPLIT_REPLICATION_SLOT_PREFIX "citus_shard_split_"
|
||||
#endif /* MULTI_LOGICAL_REPLICATION_H_ */
|
||||
|
|
|
@ -12,86 +12,34 @@
|
|||
#ifndef SHARDSPLIT_LOGICAL_REPLICATION_H
|
||||
#define SHARDSPLIT_LOGICAL_REPLICATION_H
|
||||
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/multi_logical_replication.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
||||
/*
|
||||
* Invocation of 'worker_split_shard_replication_setup' UDF returns set of records
|
||||
* of custom datatype 'replication_slot_info'. This information is parsed and stored in
|
||||
* the below data structure. The information is used to create a subscriber on target node
|
||||
* with corresponding slot name.
|
||||
* GroupedShardSplitInfos groups all ShardSplitInfos belonging to the same node
|
||||
* and table owner together. This data structure its only purpose is creating a
|
||||
* hashmap that allows us to search ShardSplitInfos by node and owner.
|
||||
*/
|
||||
typedef struct ReplicationSlotInfo
|
||||
typedef struct GroupedShardSplitInfos
|
||||
{
|
||||
uint32 targetNodeId;
|
||||
char *tableOwnerName;
|
||||
char *slotName;
|
||||
} ReplicationSlotInfo;
|
||||
|
||||
/*
|
||||
* Stores information necesary for creating a subscriber on target node.
|
||||
* Based on how a shard is split and mapped to target nodes, for each unique combination of
|
||||
* <tableOwner, targetNodeId> there is a 'ShardSplitSubscriberMetadata'.
|
||||
*/
|
||||
typedef struct ShardSplitSubscriberMetadata
|
||||
{
|
||||
Oid tableOwnerId;
|
||||
ReplicationSlotInfo *slotInfo;
|
||||
|
||||
/*
|
||||
* Exclusively claimed connection for a subscription.The target node of subscription
|
||||
* is pointed by ReplicationSlotInfo.
|
||||
*/
|
||||
MultiConnection *targetNodeConnection;
|
||||
} ShardSplitSubscriberMetadata;
|
||||
|
||||
/* key for NodeShardMappingEntry */
|
||||
typedef struct NodeShardMappingKey
|
||||
{
|
||||
uint32_t nodeId;
|
||||
Oid tableOwnerId;
|
||||
} NodeShardMappingKey;
|
||||
|
||||
/* Entry for hash map */
|
||||
typedef struct NodeShardMappingEntry
|
||||
{
|
||||
NodeShardMappingKey key;
|
||||
NodeAndOwner key;
|
||||
List *shardSplitInfoList;
|
||||
} NodeShardMappingEntry;
|
||||
} GroupedShardSplitInfos;
|
||||
|
||||
extern uint32 NodeShardMappingHash(const void *key, Size keysize);
|
||||
extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
|
||||
extern HTAB * SetupHashMapForShardInfo(void);
|
||||
|
||||
/* Functions for subscriber metadata management */
|
||||
extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
|
||||
List *replicationSlotInfoList);
|
||||
List *replicationSlotInfoList,
|
||||
List *
|
||||
shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList);
|
||||
extern HTAB * CreateShardSplitInfoMapForPublication(
|
||||
List *sourceColocatedShardIntervalList,
|
||||
List *shardGroupSplitIntervalListList,
|
||||
List *destinationWorkerNodesList);
|
||||
|
||||
/* Functions for creating publications and subscriptions*/
|
||||
extern void AlterShardSplitPublications(MultiConnection *sourceConnection,
|
||||
HTAB *shardInfoHashMapForPublication);
|
||||
extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
|
||||
List *shardSplitSubscriberMetadataList,
|
||||
WorkerNode *sourceWorkerNode, char *superUser,
|
||||
char *databaseName);
|
||||
extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection,
|
||||
char *templateSlotName,
|
||||
List *shardSplitSubscriberMetadataList);
|
||||
extern List * CreateTargetNodeConnectionsForShardSplit(
|
||||
List *shardSplitSubscribersMetadataList,
|
||||
int
|
||||
connectionFlags, char *user,
|
||||
char *databaseName);
|
||||
|
||||
/* Functions to drop publisher-subscriber resources */
|
||||
extern void CreateShardSplitPublications(MultiConnection *sourceConnection,
|
||||
HTAB *shardInfoHashMapForPublication);
|
||||
extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
|
||||
MultiConnection *
|
||||
sourceConnection);
|
||||
extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode,
|
||||
HTAB *shardSplitMapOfPublications);
|
||||
extern void DropShardSplitPublications(MultiConnection *sourceConnection,
|
||||
|
@ -99,16 +47,4 @@ extern void DropShardSplitPublications(MultiConnection *sourceConnection,
|
|||
extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList);
|
||||
extern void DropShardSplitReplicationSlots(MultiConnection *sourceConnection,
|
||||
List *replicationSlotInfoList);
|
||||
extern void DisableAndDropShardSplitSubscription(MultiConnection *connection,
|
||||
char *subscriptionName);
|
||||
|
||||
/* Wrapper functions which wait for a subscriber to be ready and catchup */
|
||||
extern void WaitForShardSplitRelationSubscriptionsBecomeReady(
|
||||
List *shardSplitPubSubMetadataList);
|
||||
extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
|
||||
List *
|
||||
shardSplitPubSubMetadataList);
|
||||
|
||||
extern char * ShardSplitTemplateReplicationSlotName(uint64 shardId);
|
||||
extern void CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList);
|
||||
#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
#ifndef SHARDSPLIT_SHARED_MEMORY_H
|
||||
#define SHARDSPLIT_SHARED_MEMORY_H
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
/*
|
||||
* In-memory mapping of a split child shard.
|
||||
*/
|
||||
|
@ -79,6 +81,4 @@ extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void);
|
|||
|
||||
extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName, MemoryContext
|
||||
cxt);
|
||||
|
||||
extern char * EncodeReplicationSlot(uint32_t nodeId, uint32_t tableOwnerId);
|
||||
#endif /* SHARDSPLIT_SHARED_MEMORY_H */
|
||||
|
|
|
@ -154,6 +154,25 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure on disabling subscription (right before dropping it)
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
-- cancellation on disabling subscription (right before dropping it)
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure on dropping subscription
|
||||
|
@ -164,15 +183,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()');
|
|||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
WARNING: connection not open
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
||||
-- cancellation on dropping subscription
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
|
@ -209,11 +221,8 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
|
|||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
WARNING: connection not open
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
@ -241,11 +250,8 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
|
|||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
WARNING: connection not open
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
@ -267,11 +273,8 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
|
|||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
WARNING: connection not open
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
||||
-- Verify that the shard is not moved and the number of rows are still 100k
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
|
|
|
@ -27,8 +27,11 @@ CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_683
|
|||
\c - - - :master_port
|
||||
SET search_path TO logical_replication;
|
||||
\set connection_string '\'user=postgres host=localhost port=' :worker_1_port '\''
|
||||
CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :connection_string PUBLICATION citus_shard_move_publication_:postgres_oid;
|
||||
NOTICE: created replication slot "citus_shard_move_subscription_10" on publisher
|
||||
CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
|
||||
CONNECTION :connection_string
|
||||
PUBLICATION citus_shard_move_publication_:postgres_oid
|
||||
WITH (slot_name=citus_shard_move_slot_:postgres_oid);
|
||||
NOTICE: created replication slot "citus_shard_move_slot_10" on publisher
|
||||
SELECT count(*) from pg_subscription;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -78,8 +78,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple
|
|||
|
||||
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
|
||||
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
|
||||
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
|
||||
-- Create subscription at worker2 with copy_data to 'false'
|
||||
\c - postgres - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
|
|
@ -71,7 +71,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
|
|||
1
|
||||
(1 row)
|
||||
|
||||
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
|
||||
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
|
|
@ -19,7 +19,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
|
|||
1
|
||||
(1 row)
|
||||
|
||||
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
|
||||
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset
|
||||
-- Create subscription at worker1 with copy_data to 'false' a
|
||||
BEGIN;
|
||||
CREATE SUBSCRIPTION local_subscription
|
||||
|
|
|
@ -18,8 +18,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple
|
|||
2
|
||||
(1 row)
|
||||
|
||||
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset
|
||||
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
|
||||
CREATE SUBSCRIPTION sub_worker1
|
||||
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
|
||||
|
|
|
@ -79,6 +79,14 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
|
|||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure on disabling subscription (right before dropping it)
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- cancellation on disabling subscription (right before dropping it)
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure on dropping subscription
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
|
|
@ -24,7 +24,10 @@ CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_683
|
|||
\c - - - :master_port
|
||||
SET search_path TO logical_replication;
|
||||
\set connection_string '\'user=postgres host=localhost port=' :worker_1_port '\''
|
||||
CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :connection_string PUBLICATION citus_shard_move_publication_:postgres_oid;
|
||||
CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
|
||||
CONNECTION :connection_string
|
||||
PUBLICATION citus_shard_move_publication_:postgres_oid
|
||||
WITH (slot_name=citus_shard_move_slot_:postgres_oid);
|
||||
|
||||
SELECT count(*) from pg_subscription;
|
||||
SELECT count(*) from pg_publication;
|
||||
|
|
|
@ -76,9 +76,9 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
|
|||
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
|
||||
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
|
||||
|
||||
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
|
||||
|
||||
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
|
||||
|
||||
-- Create subscription at worker2 with copy_data to 'false'
|
||||
\c - postgres - :worker_2_port
|
||||
|
|
|
@ -71,7 +71,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
|
|||
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
|
||||
]);
|
||||
|
||||
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
|
||||
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset
|
||||
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
\c - - - :worker_2_port
|
||||
|
|
|
@ -18,7 +18,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
|
|||
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
|
||||
]);
|
||||
|
||||
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
|
||||
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset
|
||||
|
||||
-- Create subscription at worker1 with copy_data to 'false' a
|
||||
BEGIN;
|
||||
|
|
|
@ -16,8 +16,8 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
|
|||
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
|
||||
]);
|
||||
|
||||
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset
|
||||
|
||||
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
|
||||
CREATE SUBSCRIPTION sub_worker1
|
||||
|
|
Loading…
Reference in New Issue