1) Drop colocated shards
2) Swap Metadata
3) Drop dummy shards
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-14 16:23:00 +05:30
parent 662a4aaec1
commit 2fa1dac051
10 changed files with 750 additions and 428 deletions

View File

@ -81,6 +81,7 @@ LookupSplitMode(Oid shardTransferModeOid)
{ {
shardSplitMode = BLOCKING_SPLIT; shardSplitMode = BLOCKING_SPLIT;
} }
/* Extend with other modes as we support them */ /* Extend with other modes as we support them */
else if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0 || else if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0 ||
strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0) strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0)

View File

@ -50,9 +50,9 @@ static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode,
static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
@ -74,9 +74,9 @@ static void BlockingShardSplit(SplitOperation splitOperation,
List *shardSplitPointsList, List *shardSplitPointsList,
List *workersForPlacementList); List *workersForPlacementList);
static void NonBlockingShardSplit(SplitOperation splitOperation, static void NonBlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit, ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList, List *shardSplitPointsList,
List *workersForPlacementList); List *workersForPlacementList);
static void DoSplitCopy(WorkerNode *sourceShardNode, static void DoSplitCopy(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
@ -94,6 +94,10 @@ static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList,
WorkerNode *workerNode); WorkerNode *workerNode);
static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval);
static void DropDummyShards(void);
void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
static const char *const SplitOperationName[] = static const char *const SplitOperationName[] =
{ {
@ -106,6 +110,9 @@ static const char *const SplitTargetName[] =
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
}; };
/* Map containing list of dummy shards created on target nodes */
static HTAB *DummyShardInfoHashMap = NULL;
/* Function definitions */ /* Function definitions */
/* /*
@ -1039,7 +1046,7 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
int connectionFlags = FOR_DDL; int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION; connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connnection = GetNodeUserDatabaseConnection( MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags, connectionFlags,
workerPlacementNode->workerName, workerPlacementNode->workerName,
workerPlacementNode->workerPort, workerPlacementNode->workerPort,
@ -1051,13 +1058,14 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
* The shard may or may not exist and the connection could have died. * The shard may or may not exist and the connection could have died.
*/ */
ExecuteOptionalRemoteCommand( ExecuteOptionalRemoteCommand(
connnection, connection,
dropShardQuery->data, dropShardQuery->data,
NULL /* pgResult */); NULL /* pgResult */);
} }
} }
} }
/* /*
* SplitShard API to split a given shard (or shard group) in blocking fashion * SplitShard API to split a given shard (or shard group) in blocking fashion
* based on specified split points to a set of destination nodes. * based on specified split points to a set of destination nodes.
@ -1068,9 +1076,9 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
*/ */
static void static void
NonBlockingShardSplit(SplitOperation splitOperation, NonBlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit, ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList, List *shardSplitPointsList,
List *workersForPlacementList) List *workersForPlacementList)
{ {
List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( List *sourceColocatedShardIntervalList = ColocatedShardIntervalList(
shardIntervalToSplit); shardIntervalToSplit);
@ -1107,12 +1115,33 @@ NonBlockingShardSplit(SplitOperation splitOperation,
sourceShardToCopyNode, sourceShardToCopyNode,
workersForPlacementList); workersForPlacementList);
/*TODO: Refactor this method. BlockWrites is within this as of now, take it out */
SplitShardReplicationSetup( SplitShardReplicationSetup(
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
sourceShardToCopyNode, sourceShardToCopyNode,
workersForPlacementList); workersForPlacementList);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
DropDummyShards();
} }
PG_CATCH(); PG_CATCH();
{ {
@ -1120,17 +1149,20 @@ NonBlockingShardSplit(SplitOperation splitOperation,
TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList, TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
DropDummyShards();
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
} }
/* Create ShardGroup split children on a list of corresponding workers. */ /* Create ShardGroup split children on a list of corresponding workers. */
static void static void
CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *workersForPlacementList) List *workersForPlacementList)
{ {
/* Iterate on shard interval list for shard group */ /* Iterate on shard interval list for shard group */
List *shardIntervalList = NULL; List *shardIntervalList = NULL;
@ -1151,18 +1183,25 @@ CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
/* Create new split child shard on the specified placement list */ /* Create new split child shard on the specified worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
} }
} }
} }
static void static void
CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
List *workersForPlacementList) List *workersForPlacementList)
{ {
/*
* Setup a hash map to store list of dummy shards created on nodes.
* This will facilitate easy cleanup.
*/
DummyShardInfoHashMap = SetupHashMapForShardInfo();
/* /*
* Statisfy Constraint 1: Create dummy source shard(s) on all destination nodes. * Statisfy Constraint 1: Create dummy source shard(s) on all destination nodes.
* If source node is also in desintation, skip dummy shard creation(see Note 1 from function description). * If source node is also in desintation, skip dummy shard creation(see Note 1 from function description).
@ -1185,7 +1224,7 @@ CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, sourceColocatedShardIntervalList) foreach_ptr(shardInterval, sourceColocatedShardIntervalList)
{ {
/* Populate list of commands necessary to create shard interval on destination */ /* Populate list of commands necessary to create shard interval on destination */
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId, shardInterval->relationId,
false, /* includeSequenceDefaults */ false, /* includeSequenceDefaults */
@ -1194,8 +1233,11 @@ CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
/* Create new split child shard on the specified placement list */ /* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
/* Add dummy source shard entry created for placement node in map */
AddDummyShardEntryInMap(workerPlacementNode->nodeId, shardInterval);
} }
} }
@ -1224,12 +1266,16 @@ CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
/* Create new split child shard on the specified placement list */ /* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
/* Add dummy split child shard entry created on source node */
AddDummyShardEntryInMap(sourceWorkerNode->nodeId, shardInterval);
} }
} }
} }
static HTAB * static HTAB *
CreateWorkerForPlacementSet(List *workersForPlacementList) CreateWorkerForPlacementSet(List *workersForPlacementList)
{ {
@ -1257,15 +1303,16 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
} }
static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, static void
List *shardGroupSplitIntervalListList, SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
WorkerNode *sourceWorkerNode, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList) WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList)
{ {
StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF(
StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF(sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
destinationWorkerNodesList); destinationWorkerNodesList);
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
@ -1279,25 +1326,104 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
ClaimConnectionExclusively(sourceConnection); ClaimConnectionExclusively(sourceConnection);
PGresult *result = NULL; PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, splitShardReplicationUDF->data, &result); int queryResult = ExecuteOptionalRemoteCommand(sourceConnection,
splitShardReplicationUDF->data,
&result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result)) if (queryResult != RESPONSE_OKAY || !IsResponseOK(result))
{ {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Failed to run worker_split_shard_replication_setup"))); errmsg("Failed to run worker_split_shard_replication_setup")));
PQclear(result); PQclear(result);
ForgetResults(sourceConnection); ForgetResults(sourceConnection);
} }
/* Get replication slot information */ /* Get replication slot information */
List * replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result);
List * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList(sourceColocatedShardIntervalList, List *shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList(
shardGroupSplitIntervalListList, sourceColocatedShardIntervalList,
destinationWorkerNodesList, shardGroupSplitIntervalListList,
replicationSlotInfoList); destinationWorkerNodesList,
replicationSlotInfoList);
LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, destinationWorkerNodesList); LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList);
} }
static void
AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval)
{
NodeShardMappingKey key;
key.nodeId = targetNodeId;
key.tableOwnerId = TableOwnerOid(shardInterval->relationId);
bool found = false;
NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(DummyShardInfoHashMap, &key, HASH_ENTER,
&found);
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NIL;
}
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
}
static void
DropDummyShards()
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, DummyShardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
false /* missingOk */);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags,
shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
List *dummyShardIntervalList = entry->shardSplitInfoList;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList)
{
TryDropShard(connection, shardInterval);
}
}
}
void
TryDropShard(MultiConnection *connection, ShardInterval *shardInterval)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connection,
dropShardQuery->data,
NULL /* pgResult */);
}

View File

@ -48,11 +48,12 @@ static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
HTAB *shardInfoHashMap); HTAB *shardInfoHashMap);
static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap); static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap);
StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList); StringInfo GetSoureAndDestinationShardNames(List *shardSplitInfoList);
char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo); char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo);
/* /*
* worker_split_shard_replication_setup UDF creates in-memory data structures * worker_split_shard_replication_setup UDF creates in-memory data structures
@ -170,7 +171,7 @@ SetupHashMapForShardInfo()
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE); int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE);
HTAB * shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags); HTAB *shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
return shardInfoMap; return shardInfoMap;
} }
@ -420,9 +421,10 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
*nodeId = DatumGetInt32(nodeIdDatum); *nodeId = DatumGetInt32(nodeIdDatum);
} }
static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap)
{
static void
CreatePublishersForSplitChildren(HTAB *shardInfoHashMap)
{
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap); hash_seq_init(&status, shardInfoHashMap);
@ -436,21 +438,25 @@ static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap)
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
printf("Sameer getting new connection \n"); printf("Sameer getting new connection \n");
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
"localhost", "localhost",
PostPortNumber, PostPortNumber,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
get_database_name( get_database_name(
MyDatabaseId)); MyDatabaseId));
StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames(entry->shardSplitInfoList); StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames(
entry->shardSplitInfoList);
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s", nodeId, tableOwnerId,shardNamesForPublication->data); appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s",
nodeId, tableOwnerId, shardNamesForPublication->data);
ExecuteCriticalRemoteCommand(sourceConnection, command->data); ExecuteCriticalRemoteCommand(sourceConnection, command->data);
printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false)); printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false));
} }
} }
StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList)
StringInfo
GetSoureAndDestinationShardNames(List *shardSplitInfoList)
{ {
HASHCTL info; HASHCTL info;
int flags = HASH_ELEM | HASH_CONTEXT; int flags = HASH_ELEM | HASH_CONTEXT;
@ -475,7 +481,7 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList)
uint64 sourceShardId = shardSplitInfo->sourceShardId; uint64 sourceShardId = shardSplitInfo->sourceShardId;
hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found); hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found);
if(addComma) if (addComma)
{ {
appendStringInfo(allShardNames, ","); appendStringInfo(allShardNames, ",");
} }
@ -493,9 +499,9 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList)
while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL) while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL)
{ {
ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry); ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry);
char* sourceShardName = ConstructQualifiedShardName(sourceShardInterval); char *sourceShardName = ConstructQualifiedShardName(sourceShardInterval);
if(addComma) if (addComma)
{ {
appendStringInfo(allShardNames, ","); appendStringInfo(allShardNames, ",");
} }
@ -507,8 +513,9 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList)
return allShardNames; return allShardNames;
} }
char * char *
ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo) ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo)
{ {
Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid); Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
@ -521,7 +528,10 @@ ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo)
return shardName; return shardName;
} }
static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
static void
ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap); hash_seq_init(&status, shardInfoHashMap);
@ -537,10 +547,11 @@ static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *t
values[0] = Int32GetDatum(entry->key.nodeId); values[0] = Int32GetDatum(entry->key.nodeId);
char * tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false);
values[1] = CStringGetTextDatum(tableOwnerName); values[1] = CStringGetTextDatum(tableOwnerName);
char * slotName = encode_replication_slot(entry->key.nodeId, entry->key.tableOwnerId); char *slotName = encode_replication_slot(entry->key.nodeId,
entry->key.tableOwnerId);
values[2] = CStringGetTextDatum(slotName); values[2] = CStringGetTextDatum(slotName);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);

View File

@ -136,19 +136,22 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static uint64 TotalRelationSizeForSubscription(MultiConnection *connection, static uint64 TotalRelationSizeForSubscription(MultiConnection *connection,
char *command); char *command);
static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds, char * operationPrefix); Bitmapset *tableOwnerIds,
char *operationPrefix);
static void WaitForMiliseconds(long timeout); static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection,
Bitmapset *tableOwnerIds, char * operationPrefix); Bitmapset *tableOwnerIds,
char *operationPrefix);
static char * ShardMovePublicationName(Oid ownerId); static char * ShardMovePublicationName(Oid ownerId);
static char * ShardSubscriptionName(Oid ownerId, char * operationPrefix); static char * ShardSubscriptionName(Oid ownerId, char *operationPrefix);
static void AcquireLogicalReplicationLock(void); static void AcquireLogicalReplicationLock(void);
static void DropAllShardMoveLeftovers(void); static void DropAllShardMoveLeftovers(void);
static void DropAllShardMoveSubscriptions(MultiConnection *connection); static void DropAllShardMoveSubscriptions(MultiConnection *connection);
static void DropAllShardMoveReplicationSlots(MultiConnection *connection); static void DropAllShardMoveReplicationSlots(MultiConnection *connection);
static void DropAllShardMovePublications(MultiConnection *connection); static void DropAllShardMovePublications(MultiConnection *connection);
static void DropAllShardMoveUsers(MultiConnection *connection); static void DropAllShardMoveUsers(MultiConnection *connection);
static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix); static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds,
char *operationPrefix);
static void DropShardMoveSubscription(MultiConnection *connection, static void DropShardMoveSubscription(MultiConnection *connection,
char *subscriptionName); char *subscriptionName);
static void DropShardMoveReplicationSlot(MultiConnection *connection, static void DropShardMoveReplicationSlot(MultiConnection *connection,
@ -227,14 +230,16 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
* subscription is not ready. There is no point of locking the shards before the * subscription is not ready. There is no point of locking the shards before the
* subscriptions for each relation becomes ready, so wait for it. * subscriptions for each relation becomes ready, so wait for it.
*/ */
WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds,
SHARD_MOVE_SUBSCRIPTION_PREFIX);
/* /*
* Wait until the subscription is caught up to changes that has happened * Wait until the subscription is caught up to changes that has happened
* after the initial COPY on the shards. * after the initial COPY on the shards.
*/ */
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds,
SHARD_MOVE_SUBSCRIPTION_PREFIX);
/* /*
* Now lets create the post-load objects, such as the indexes, constraints * Now lets create the post-load objects, such as the indexes, constraints
@ -244,7 +249,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName, CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName,
targetNodePort); targetNodePort);
sourcePosition = GetRemoteLogPosition(sourceConnection); sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds,
SHARD_MOVE_SUBSCRIPTION_PREFIX);
/* /*
* We're almost done, we'll block the writes to the shards that we're * We're almost done, we'll block the writes to the shards that we're
@ -257,7 +263,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
BlockWritesToShardList(shardList); BlockWritesToShardList(shardList);
sourcePosition = GetRemoteLogPosition(sourceConnection); sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds,
SHARD_MOVE_SUBSCRIPTION_PREFIX);
/* /*
* We're creating the foreign constraints to reference tables after the * We're creating the foreign constraints to reference tables after the
@ -1085,7 +1092,8 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds)
* If replication slot can not be dropped while dropping the subscriber, drop * If replication slot can not be dropped while dropping the subscriber, drop
* it here. * it here.
*/ */
DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX));
DropShardMovePublication(connection, ShardMovePublicationName(ownerId)); DropShardMovePublication(connection, ShardMovePublicationName(ownerId));
} }
} }
@ -1144,7 +1152,7 @@ ShardMovePublicationName(Oid ownerId)
* coordinator is blocked by the blocked replication process. * coordinator is blocked by the blocked replication process.
*/ */
static char * static char *
ShardSubscriptionName(Oid ownerId, char * operationPrefix) ShardSubscriptionName(Oid ownerId, char *operationPrefix)
{ {
if (RunningUnderIsolationTest) if (RunningUnderIsolationTest)
{ {
@ -1317,7 +1325,8 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds
int ownerId = -1; int ownerId = -1;
while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0)
{ {
DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX));
DropShardMoveUser(connection, ShardMoveSubscriptionRole(ownerId)); DropShardMoveUser(connection, ShardMoveSubscriptionRole(ownerId));
} }
} }
@ -1491,7 +1500,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
appendStringInfo(createSubscriptionCommand, appendStringInfo(createSubscriptionCommand,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (citus_use_authinfo=true, enabled=false)", "WITH (citus_use_authinfo=true, enabled=false)",
quote_identifier(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)), quote_identifier(ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX)),
quote_literal_cstr(conninfo->data), quote_literal_cstr(conninfo->data),
quote_identifier(ShardMovePublicationName(ownerId))); quote_identifier(ShardMovePublicationName(ownerId)));
@ -1500,7 +1510,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
pfree(createSubscriptionCommand); pfree(createSubscriptionCommand);
ExecuteCriticalRemoteCommand(connection, psprintf( ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s OWNER TO %s", "ALTER SUBSCRIPTION %s OWNER TO %s",
ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX), ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX),
ShardMoveSubscriptionRole(ownerId) ShardMoveSubscriptionRole(ownerId)
)); ));
@ -1519,7 +1530,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
ExecuteCriticalRemoteCommand(connection, psprintf( ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s ENABLE", "ALTER SUBSCRIPTION %s ENABLE",
ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX) ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX)
)); ));
} }
} }
@ -1627,7 +1639,7 @@ GetRemoteLSN(MultiConnection *connection, char *command)
*/ */
void void
WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds, char * operationPrefix) Bitmapset *tableOwnerIds, char *operationPrefix)
{ {
uint64 previousTotalRelationSizeForSubscription = 0; uint64 previousTotalRelationSizeForSubscription = 0;
TimestampTz previousSizeChangeTime = GetCurrentTimestamp(); TimestampTz previousSizeChangeTime = GetCurrentTimestamp();
@ -1655,7 +1667,8 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
while (true) while (true)
{ {
/* we're done, all relations are ready */ /* we're done, all relations are ready */
if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds, operationPrefix)) if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds,
operationPrefix))
{ {
ereport(LOG, (errmsg("The states of the relations belonging to the " ereport(LOG, (errmsg("The states of the relations belonging to the "
"subscriptions became READY on the " "subscriptions became READY on the "
@ -1665,7 +1678,8 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
break; break;
} }
char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds,
operationPrefix);
/* Get the current total size of tables belonging to the subscriber */ /* Get the current total size of tables belonging to the subscriber */
uint64 currentTotalRelationSize = uint64 currentTotalRelationSize =
@ -1824,7 +1838,7 @@ TotalRelationSizeForSubscription(MultiConnection *connection, char *command)
* be used in a query by using the IN operator. * be used in a query by using the IN operator.
*/ */
static char * static char *
ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix) ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix)
{ {
StringInfo subscriptionValueList = makeStringInfo(); StringInfo subscriptionValueList = makeStringInfo();
appendStringInfoString(subscriptionValueList, "("); appendStringInfoString(subscriptionValueList, "(");
@ -1842,7 +1856,8 @@ ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix
first = false; first = false;
} }
appendStringInfoString(subscriptionValueList, appendStringInfoString(subscriptionValueList,
quote_literal_cstr(ShardSubscriptionName(ownerId, operationPrefix))); quote_literal_cstr(ShardSubscriptionName(ownerId,
operationPrefix)));
} }
appendStringInfoString(subscriptionValueList, ")"); appendStringInfoString(subscriptionValueList, ")");
return subscriptionValueList->data; return subscriptionValueList->data;
@ -1855,11 +1870,12 @@ ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix
*/ */
static bool static bool
RelationSubscriptionsAreReady(MultiConnection *targetConnection, RelationSubscriptionsAreReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds, char * operationPrefix) Bitmapset *tableOwnerIds, char *operationPrefix)
{ {
bool raiseInterrupts = false; bool raiseInterrupts = false;
char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds,
operationPrefix);
char *query = psprintf( char *query = psprintf(
"SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription " "SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription "
"WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s", "WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s",
@ -1910,8 +1926,9 @@ RelationSubscriptionsAreReady(MultiConnection *targetConnection,
* The function also reports its progress in every logicalReplicationProgressReportTimeout. * The function also reports its progress in every logicalReplicationProgressReportTimeout.
*/ */
void void
WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr sourcePosition, WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr
Bitmapset *tableOwnerIds, char * operationPrefix) sourcePosition,
Bitmapset *tableOwnerIds, char *operationPrefix)
{ {
XLogRecPtr previousTargetPosition = 0; XLogRecPtr previousTargetPosition = 0;
TimestampTz previousLSNIncrementTime = GetCurrentTimestamp(); TimestampTz previousLSNIncrementTime = GetCurrentTimestamp();
@ -2050,9 +2067,11 @@ WaitForMiliseconds(long timeout)
* replication. * replication.
*/ */
static XLogRecPtr static XLogRecPtr
GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, char * operationPrefix) GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds,
char *operationPrefix)
{ {
char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds,
operationPrefix);
return GetRemoteLSN(connection, psprintf( return GetRemoteLSN(connection, psprintf(
"SELECT min(latest_end_lsn) FROM pg_stat_subscription " "SELECT min(latest_end_lsn) FROM pg_stat_subscription "
"WHERE subname IN %s", subscriptionValueList)); "WHERE subname IN %s", subscriptionValueList));
@ -2062,64 +2081,66 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, c
/*Refactor this for ShardMove too.*/ /*Refactor this for ShardMove too.*/
void void
CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName, char *databaseName, int sourceNodePort, char *userName, char *databaseName,
char * publicationName, char *publicationName,
Oid ownerId) Oid ownerId)
{ {
StringInfo createSubscriptionCommand = makeStringInfo();
StringInfo conninfo = makeStringInfo();
StringInfo createSubscriptionCommand = makeStringInfo(); /*
StringInfo conninfo = makeStringInfo(); * The CREATE USER command should not propagate, so we temporarily
* disable DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"CREATE USER %s SUPERUSER IN ROLE %s",
ShardMoveSubscriptionRole(ownerId),
GetUserNameFromId(ownerId, false)
)));
/* appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "
* The CREATE USER command should not propagate, so we temporarily "connect_timeout=20",
* disable DDL propagation. escape_param_str(sourceNodeName), sourceNodePort,
*/ escape_param_str(userName), escape_param_str(databaseName));
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"CREATE USER %s SUPERUSER IN ROLE %s",
ShardMoveSubscriptionRole(ownerId),
GetUserNameFromId(ownerId, false)
)));
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " appendStringInfo(createSubscriptionCommand,
"connect_timeout=20", "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
escape_param_str(sourceNodeName), sourceNodePort, "WITH (citus_use_authinfo=true, enabled=false)",
escape_param_str(userName), escape_param_str(databaseName)); quote_identifier(ShardSubscriptionName(ownerId,
SHARD_SPLIT_SUBSCRIPTION_PREFIX)),
quote_literal_cstr(conninfo->data),
quote_identifier(publicationName));
appendStringInfo(createSubscriptionCommand, ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data);
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " pfree(createSubscriptionCommand->data);
"WITH (citus_use_authinfo=true, enabled=false)", pfree(createSubscriptionCommand);
quote_identifier(ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)), ExecuteCriticalRemoteCommand(connection, psprintf(
quote_literal_cstr(conninfo->data), "ALTER SUBSCRIPTION %s OWNER TO %s",
quote_identifier(publicationName)); ShardSubscriptionName(ownerId,
SHARD_SPLIT_SUBSCRIPTION_PREFIX),
ShardMoveSubscriptionRole(ownerId)
));
ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); /*
pfree(createSubscriptionCommand->data); * The ALTER ROLE command should not propagate, so we temporarily
pfree(createSubscriptionCommand); * disable DDL propagation.
ExecuteCriticalRemoteCommand(connection, psprintf( */
"ALTER SUBSCRIPTION %s OWNER TO %s", SendCommandListToWorkerOutsideTransaction(
ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX), connection->hostname, connection->port, connection->user,
ShardMoveSubscriptionRole(ownerId) list_make2(
)); "SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"ALTER ROLE %s NOSUPERUSER",
ShardMoveSubscriptionRole(ownerId)
)));
/* ExecuteCriticalRemoteCommand(connection, psprintf(
* The ALTER ROLE command should not propagate, so we temporarily "ALTER SUBSCRIPTION %s ENABLE",
* disable DDL propagation. ShardSubscriptionName(ownerId,
*/ SHARD_SPLIT_SUBSCRIPTION_PREFIX)
SendCommandListToWorkerOutsideTransaction( ));
connection->hostname, connection->port, connection->user,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"ALTER ROLE %s NOSUPERUSER",
ShardMoveSubscriptionRole(ownerId)
)));
ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s ENABLE",
ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)
));
} }

View File

@ -26,34 +26,43 @@
static HTAB *ShardInfoHashMapForPublications = NULL; static HTAB *ShardInfoHashMapForPublications = NULL;
/* function declarations */ /* function declarations */
static void AddShardEntryInMap(Oid tableOwner, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval); static void AddPublishableShardEntryInMap(uint32 targetNodeId,
ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIdList, List * replicationSlotInfoList); ShardInterval *shardInterval, bool
isChildShardInterval);
ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId,
List *shardIdList,
List *replicationSlotInfoList);
static void static void CreateShardSplitPublicationForNode(MultiConnection *connection,
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, List *shardList,
uint32_t publicationForTargetNodeId, Oid tableOwner); uint32_t publicationForTargetNodeId, Oid
static void tableOwner);
CreateShardSplitPublications(MultiConnection *sourceConnection, List * shardSplitPubSubMetadataList); static void CreateShardSplitPublications(MultiConnection *sourceConnection,
static void List *shardSplitPubSubMetadataList);
CreateShardSplitSubscriptions(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList, WorkerNode * sourceWorkerNode, char * superUser, char * databaseName); static void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
static void List *shardSplitPubSubMetadataList,
WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); WorkerNode *sourceWorkerNode, char *superUser,
char *databaseName);
static void WaitForShardSplitRelationSubscriptionsBecomeReady(
List *targetNodeConnectionList, List *shardSplitPubSubMetadataList);
static void static void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); List *
targetNodeConnectionList,
List *
shardSplitPubSubMetadataList);
static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
static char *
ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
/*used for debuggin. Remove later*/ /*used for debuggin. Remove later*/
void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata); void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata);
StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, StringInfo
List *shardGroupSplitIntervalListList, CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
List *destinationWorkerNodesList) List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList)
{ {
StringInfo splitChildrenRows = makeStringInfo(); StringInfo splitChildrenRows = makeStringInfo();
ShardInterval *sourceShardIntervalToCopy = NULL; ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitChildShardIntervalList = NULL; List *splitChildShardIntervalList = NULL;
@ -66,7 +75,7 @@ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardInterva
ShardInterval *splitChildShardInterval = NULL; ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL; WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList) destinationWorkerNode, destinationWorkerNodesList)
{ {
if (addComma) if (addComma)
{ {
@ -74,62 +83,69 @@ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardInterva
} }
StringInfo minValueString = makeStringInfo(); StringInfo minValueString = makeStringInfo();
appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue)); appendStringInfo(minValueString, "%d", DatumGetInt32(
splitChildShardInterval->minValue));
StringInfo maxValueString = makeStringInfo(); StringInfo maxValueString = makeStringInfo();
appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue)); appendStringInfo(maxValueString, "%d", DatumGetInt32(
splitChildShardInterval->maxValue));
appendStringInfo(splitChildrenRows, appendStringInfo(splitChildrenRows,
"ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info",
sourceShardId, sourceShardId,
splitChildShardInterval->shardId, splitChildShardInterval->shardId,
quote_literal_cstr(minValueString->data), quote_literal_cstr(minValueString->data),
quote_literal_cstr(maxValueString->data), quote_literal_cstr(maxValueString->data),
destinationWorkerNode->nodeId); destinationWorkerNode->nodeId);
addComma = true; addComma = true;
} }
} }
StringInfo splitShardReplicationUDF = makeStringInfo(); StringInfo splitShardReplicationUDF = makeStringInfo();
appendStringInfo(splitShardReplicationUDF, appendStringInfo(splitShardReplicationUDF,
"SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data); "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])",
splitChildrenRows->data);
return splitShardReplicationUDF; return splitShardReplicationUDF;
} }
List * ParseReplicationSlotInfoFromResult(PGresult * result)
List *
ParseReplicationSlotInfoFromResult(PGresult *result)
{ {
int64 rowCount = PQntuples(result); int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result); int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL; List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
ReplicationSlotInfo * replicationSlotInfo = (ReplicationSlotInfo *)palloc0(sizeof(ReplicationSlotInfo)); ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0(
sizeof(ReplicationSlotInfo));
char * targeNodeIdString = PQgetvalue(result, rowIndex, 0); char *targeNodeIdString = PQgetvalue(result, rowIndex, 0);
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
/* we're using the pstrdup to copy the data into the current memory context */ /* we're using the pstrdup to copy the data into the current memory context */
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1)); replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1));
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2)); replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2));
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
} }
/*TODO(saawasek): size of this should not be NULL */ /*TODO(saawasek): size of this should not be NULL */
return replicationSlotInfoList; return replicationSlotInfoList;
} }
List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, List *
List *shardGroupSplitIntervalListList, CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList,
List *destinationWorkerNodesList, List *shardGroupSplitIntervalListList,
List *replicationSlotInfoList) List *destinationWorkerNodesList,
List *replicationSlotInfoList)
{ {
ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); ShardInfoHashMapForPublications = SetupHashMapForShardInfo();
ShardInterval *sourceShardIntervalToCopy = NULL; ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitChildShardIntervalList = NULL; List *splitChildShardIntervalList = NULL;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
@ -138,247 +154,276 @@ List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList
ShardInterval *splitChildShardInterval = NULL; ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL; WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList) destinationWorkerNode, destinationWorkerNodesList)
{ {
/* Table owner is same for both parent and child shard */ uint32 destinationWorkerNodeId = destinationWorkerNode->nodeId;
Oid tableOwnerId = TableOwnerOid(sourceShardIntervalToCopy->relationId);
uint32 destinationWorkerNodeId = destinationWorkerNode->nodeId;
/* Add split child shard interval */ /* Add split child shard interval */
AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, splitChildShardInterval, true /*isChildShardInterval*/); AddPublishableShardEntryInMap(destinationWorkerNodeId,
splitChildShardInterval,
true /*isChildShardInterval*/);
/* Add parent shard interval if not already added */ /* Add parent shard interval if not already added */
AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, sourceShardIntervalToCopy, false /*isChildShardInterval*/); AddPublishableShardEntryInMap(destinationWorkerNodeId,
sourceShardIntervalToCopy,
false /*isChildShardInterval*/);
} }
} }
/* Populate pubsub meta data*/ /* Populate pubsub meta data*/
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, ShardInfoHashMapForPublications); hash_seq_init(&status, ShardInfoHashMapForPublications);
List * shardSplitPubSubMetadataList = NIL; List *shardSplitPubSubMetadataList = NIL;
NodeShardMappingEntry *entry = NULL; NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{ {
uint32_t nodeId = entry->key.nodeId; uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId; uint32_t tableOwnerId = entry->key.tableOwnerId;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadata(tableOwnerId, nodeId, entry->shardSplitInfoList, replicationSlotInfoList); ShardSplitPubSubMetadata *shardSplitPubSubMetadata =
CreateShardSplitPubSubMetadata(tableOwnerId, nodeId,
entry->shardSplitInfoList,
replicationSlotInfoList);
shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList, shardSplitPubSubMetadata); shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList,
} shardSplitPubSubMetadata);
}
return shardSplitPubSubMetadataList; return shardSplitPubSubMetadataList;
} }
static void AddShardEntryInMap(Oid tableOwnerId, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval)
static void
AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool
isChildShardInterval)
{ {
NodeShardMappingKey key; NodeShardMappingKey key;
key.nodeId = nodeId; key.nodeId = targetNodeId;
key.tableOwnerId = tableOwnerId; key.tableOwnerId = TableOwnerOid(shardInterval->relationId);
bool found = false; bool found = false;
NodeShardMappingEntry *nodeMappingEntry = NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, HASH_ENTER, (NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key,
HASH_ENTER,
&found); &found);
if (!found) if (!found)
{ {
nodeMappingEntry->shardSplitInfoList = NIL; nodeMappingEntry->shardSplitInfoList = NIL;
} }
if(isChildShardInterval) if (isChildShardInterval)
{ {
nodeMappingEntry->shardSplitInfoList = nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); lappend(nodeMappingEntry->shardSplitInfoList,
return; (ShardInterval *) shardInterval);
} return;
}
ShardInterval * existingShardInterval = NULL; ShardInterval *existingShardInterval = NULL;
foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList) foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList)
{ {
if(existingShardInterval->shardId == shardInterval->shardId) if (existingShardInterval->shardId == shardInterval->shardId)
{ {
/* parent shard interval is already added hence return */ /* parent shard interval is already added hence return */
return; return;
} }
} }
/* Add parent shard Interval */
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
/* Add parent shard Interval */
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
} }
ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIntervalList, List * replicationSlotInfoList) ShardSplitPubSubMetadata *
CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardIntervalList,
List *replicationSlotInfoList)
{ {
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = palloc0(
sizeof(ShardSplitPubSubMetadata));
shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList;
shardSplitPubSubMetadata->tableOwnerId = tableOwnerId;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = palloc0(sizeof(ShardSplitPubSubMetadata)); char *tableOwnerName = GetUserNameFromId(tableOwnerId, false);
shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList; ReplicationSlotInfo *replicationSlotInfo = NULL;
shardSplitPubSubMetadata->tableOwnerId = tableOwnerId; foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
{
if (nodeId == replicationSlotInfo->targetNodeId &&
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
{
shardSplitPubSubMetadata->slotInfo = replicationSlotInfo;
break;
}
}
char * tableOwnerName = GetUserNameFromId(tableOwnerId, false); return shardSplitPubSubMetadata;
ReplicationSlotInfo * replicationSlotInfo = NULL;
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
{
if(nodeId == replicationSlotInfo->targetNodeId &&
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
{
shardSplitPubSubMetadata->slotInfo = replicationSlotInfo;
break;
}
}
return shardSplitPubSubMetadata;
} }
void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, void
List* shardSplitPubSubMetadataList, LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList, List *shardSplitPubSubMetadataList,
List *shardGroupSplitIntervalListList, List *sourceColocatedShardIntervalList,
List *destinationWorkerNodesList) List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList)
{ {
char *superUser = CitusExtensionOwnerName(); char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId); char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
/* Get source node connection */ /* Get source node connection */
MultiConnection *sourceConnection = MultiConnection *sourceConnection =
GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName, sourceWorkerNode->workerPort, GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName,
superUser, databaseName); sourceWorkerNode->workerPort,
superUser, databaseName);
ClaimConnectionExclusively(sourceConnection); ClaimConnectionExclusively(sourceConnection);
List * targetNodeConnectionList = NIL; List *targetNodeConnectionList = NIL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
WorkerNode * targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false);
MultiConnection *targetConnection = MultiConnection *targetConnection =
GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, targetWorkerNode->workerPort, GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName,
superUser, databaseName); targetWorkerNode->workerPort,
ClaimConnectionExclusively(targetConnection); superUser, databaseName);
ClaimConnectionExclusively(targetConnection);
targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection);
} }
/* create publications */ /* create publications */
CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList);
CreateShardSplitSubscriptions(targetNodeConnectionList, CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitPubSubMetadataList, shardSplitPubSubMetadataList,
sourceWorkerNode, sourceWorkerNode,
superUser, superUser,
databaseName); databaseName);
WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, shardSplitPubSubMetadataList); WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList,
shardSplitPubSubMetadataList);
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
targetNodeConnectionList,
shardSplitPubSubMetadataList);
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, destinationWorkerNodesList); CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
destinationWorkerNodesList);
sourcePosition = GetRemoteLogPosition(sourceConnection); sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
targetNodeConnectionList,
shardSplitPubSubMetadataList);
BlockWritesToShardList(sourceColocatedShardIntervalList); BlockWritesToShardList(sourceColocatedShardIntervalList);
sourcePosition = GetRemoteLogPosition(sourceConnection); sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
targetNodeConnectionList,
shardSplitPubSubMetadataList);
/*TOOD : Create foreign key constraints and handle partitioned tables*/
/*TOOD : Create foreign key constraints and handle partitioned tables*/
} }
void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata) void
PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata)
{ {
printf("sameer: ShardSplitPubSbuMetadata\n"); printf("sameer: ShardSplitPubSbuMetadata\n");
ReplicationSlotInfo * replicationInfo = shardSplitMetadata->slotInfo; ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo;
List * shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription; List *shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription;
printf("shardIds: "); printf("shardIds: ");
ShardInterval * shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
printf("%ld ", shardInterval->shardId); printf("%ld ", shardInterval->shardId);
} }
printf("\nManual Username from OID at source: %s \n", GetUserNameFromId(shardSplitMetadata->tableOwnerId, false)); printf("\nManual Username from OID at source: %s \n", GetUserNameFromId(
printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, replicationInfo->targetNodeId, replicationInfo->tableOwnerName); shardSplitMetadata->tableOwnerId, false));
printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName,
replicationInfo->targetNodeId, replicationInfo->tableOwnerName);
} }
static void static void
CreateShardSplitPublications(MultiConnection *sourceConnection, List *shardSplitPubSubMetadataList) CreateShardSplitPublications(MultiConnection *sourceConnection,
List *shardSplitPubSubMetadataList)
{ {
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSplitPublicationForNode(sourceConnection, CreateShardSplitPublicationForNode(sourceConnection,
shardSplitPubSubMetadata->shardIntervalListForSubscription, shardSplitPubSubMetadata->
publicationForNodeId, shardIntervalListForSubscription,
tableOwnerId); publicationForNodeId,
} tableOwnerId);
}
} }
static void static void
CreateShardSplitSubscriptions(List * targetNodeConnectionList, CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List * shardSplitPubSubMetadataList, List *shardSplitPubSubMetadataList,
WorkerNode * sourceWorkerNode, WorkerNode *sourceWorkerNode,
char * superUser, char *superUser,
char * databaseName) char *databaseName)
{ {
MultiConnection * targetConnection = NULL; MultiConnection *targetConnection = NULL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList, forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList) shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; Oid ownerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSubscription(targetConnection, CreateShardSubscription(targetConnection,
sourceWorkerNode->workerName, sourceWorkerNode->workerName,
sourceWorkerNode->workerPort, sourceWorkerNode->workerPort,
superUser, superUser,
databaseName, databaseName,
ShardSplitPublicationName(publicationForNodeId, ownerId), ShardSplitPublicationName(publicationForNodeId, ownerId),
ownerId); ownerId);
} }
} }
static void static void
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid ownerId) uint32_t publicationForTargetNodeId, Oid ownerId)
{ {
StringInfo createPublicationCommand = makeStringInfo();
bool prefixWithComma = false;
StringInfo createPublicationCommand = makeStringInfo(); appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ",
bool prefixWithComma = false; ShardSplitPublicationName(publicationForTargetNodeId, ownerId));
appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", ShardInterval *shard = NULL;
ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); foreach_ptr(shard, shardList)
{
char *shardName = ConstructQualifiedShardName(shard);
ShardInterval *shard = NULL; if (prefixWithComma)
foreach_ptr(shard, shardList) {
{ appendStringInfoString(createPublicationCommand, ",");
char *shardName = ConstructQualifiedShardName(shard); }
if (prefixWithComma) appendStringInfoString(createPublicationCommand, shardName);
{ prefixWithComma = true;
appendStringInfoString(createPublicationCommand, ","); }
}
appendStringInfoString(createPublicationCommand, shardName); ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
prefixWithComma = true; pfree(createPublicationCommand->data);
} pfree(createPublicationCommand);
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
pfree(createPublicationCommand->data);
pfree(createPublicationCommand);
} }
@ -390,29 +435,37 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
static void static void
WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList) WaitForShardSplitRelationSubscriptionsBecomeReady(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList)
{ {
MultiConnection * targetConnection = NULL; MultiConnection *targetConnection = NULL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList, forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList) shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
Bitmapset *tableOwnerIds = NULL; Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); tableOwnerIds = bms_add_member(tableOwnerIds,
WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); shardSplitPubSubMetadata->tableOwnerId);
} WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds,
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
}
} }
static void static void
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * targetNodeConnectionList, List * shardSplitPubSubMetadataList) WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList)
{ {
MultiConnection * targetConnection = NULL; MultiConnection *targetConnection = NULL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList, forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList) shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
Bitmapset *tableOwnerIds = NULL; Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); tableOwnerIds = bms_add_member(tableOwnerIds,
WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); shardSplitPubSubMetadata->tableOwnerId);
} WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds,
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
}
} }

View File

@ -28,18 +28,19 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection);
extern void extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName,
int sourceNodePort, char *userName, char *databaseName, char *databaseName,
char * publicationName, char *publicationName,
Oid ownerId); Oid ownerId);
extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds, char * operationPrefix); Bitmapset *tableOwnerIds,
char *operationPrefix);
extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection,
XLogRecPtr sourcePosition, XLogRecPtr sourcePosition,
Bitmapset *tableOwnerIds, Bitmapset *tableOwnerIds,
char * operationPrefix); char *operationPrefix);
#define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" #define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_"
#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" #define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_"

View File

@ -41,8 +41,8 @@ typedef struct ShardSplitInfo
int32 shardMinValue; /* min hash value */ int32 shardMinValue; /* min hash value */
int32 shardMaxValue; /* max hash value */ int32 shardMaxValue; /* max hash value */
uint32_t nodeId; /* node where child shard is to be placed */ uint32_t nodeId; /* node where child shard is to be placed */
uint64 sourceShardId; /* parent shardId */ uint64 sourceShardId; /* parent shardId */
uint64 splitChildShardId; /* child shardId*/ uint64 splitChildShardId; /* child shardId*/
char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */
} ShardSplitInfo; } ShardSplitInfo;

View File

@ -12,16 +12,16 @@
typedef struct ReplicationSlotInfo typedef struct ReplicationSlotInfo
{ {
uint32 targetNodeId; uint32 targetNodeId;
char * tableOwnerName; char *tableOwnerName;
char * slotName; char *slotName;
} ReplicationSlotInfo; } ReplicationSlotInfo;
typedef struct ShardSplitPubSubMetadata typedef struct ShardSplitPubSubMetadata
{ {
List * shardIntervalListForSubscription; List *shardIntervalListForSubscription;
Oid tableOwnerId; Oid tableOwnerId;
ReplicationSlotInfo *slotInfo; ReplicationSlotInfo *slotInfo;
} ShardSplitPubSubMetadata; } ShardSplitPubSubMetadata;
/* key for NodeShardMappingEntry */ /* key for NodeShardMappingEntry */
@ -42,19 +42,21 @@ extern uint32 NodeShardMappingHash(const void *key, Size keysize);
extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
HTAB * SetupHashMapForShardInfo(void); HTAB * SetupHashMapForShardInfo(void);
List * ParseReplicationSlotInfoFromResult(PGresult * result); List * ParseReplicationSlotInfoFromResult(PGresult *result);
extern StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, extern StringInfo CreateSplitShardReplicationSetupUDF(
List *shardGroupSplitIntervalListList, List *sourceColocatedShardIntervalList,
List *destinationWorkerNodesList); List *
shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList, List *destinationWorkerNodesList,
List *replicationSlotInfoList); List *replicationSlotInfoList);
extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
List* shardSplitPubSubMetadataList, List *shardSplitPubSubMetadataList,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);

View File

@ -113,8 +113,8 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT citus_split_shard_by_split_points( SELECT citus_split_shard_by_split_points(
1, 1,
ARRAY['-1073741826'], ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_2_node, :worker_2_node],
'non_blocking'); 'force_logical');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
@ -165,6 +165,14 @@ NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
@ -173,14 +181,121 @@ NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info]) NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 18)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 18)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101,citus_split_shard_by_split_points_negative.table_second_102,citus_split_shard_by_split_points_negative.table_second_2,citus_split_shard_by_split_points_negative.table_second_103
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE USER citus_shard_move_subscription_role_10 SUPERUSER IN ROLE postgres
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_move_subscription_role_10
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER ROLE citus_shard_move_subscription_role_10 NOSUPERUSER
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second ADD CONSTRAINT table_second_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second ADD CONSTRAINT table_second_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1, 2])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1, 2])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(2);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(2);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_2 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '-1073741825', '2147483647'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 102, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 103, 't'::"char", '-1073741825', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '-1073741825', '2147483647'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 102, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 103, 't'::"char", '-1073741825', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 85), (101, 1, 0, 16, 86), (102, 1, 0, 16, 87), (103, 1, 0, 16, 88)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 85), (101, 1, 0, 16, 86), (102, 1, 0, 16, 87), (103, 1, 0, 16, 88)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_102 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_103 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_2 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit
citus_split_shard_by_split_points citus_split_shard_by_split_points
@ -196,27 +311,21 @@ SELECT * FROM show_catalog;
Schema | Name | Owner Schema | Name | Owner
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_second | postgres citus_split_shard_by_split_points_negative | table_second | postgres
citus_split_shard_by_split_points_negative | table_second_102 | postgres
citus_split_shard_by_split_points_negative | table_second_103 | postgres citus_split_shard_by_split_points_negative | table_second_103 | postgres
citus_split_shard_by_split_points_negative | table_second_2 | postgres
citus_split_shard_by_split_points_negative | table_to_split | postgres citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_1 | postgres citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(6 rows) (6 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative; SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog; SELECT * FROM show_catalog;
Schema | Name | Owner Schema | Name | Owner
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_second | postgres citus_split_shard_by_split_points_negative | table_second | postgres
citus_split_shard_by_split_points_negative | table_second_102 | postgres citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_second_103 | postgres (2 rows)
citus_split_shard_by_split_points_negative | table_second_2 | postgres
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_1 | postgres
citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(8 rows)
SELECT * FROM pg_publication_tables; SELECT * FROM pg_publication_tables;
pubname | schemaname | tablename pubname | schemaname | tablename

View File

@ -49,14 +49,12 @@ SELECT citus_split_shard_by_split_points(
1, 1,
ARRAY['-1073741826'], ARRAY['-1073741826'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_2_node, :worker_2_node],
'non_blocking'); 'force_logical');
-- On worker2, we want child shard 2 and dummy shard 1 -- -- On worker2, we want child shard 2 and dummy shard 1 --
-- on worker1, we want child shard 3 and 1 and dummy shard 2 -- -- on worker1, we want child shard 3 and 1 and dummy shard 2 --
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative; SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM pg_stat_subscription;
SELECT * FROM pg_subscription_rel;
SELECT * FROM show_catalog; SELECT * FROM show_catalog;
\c - - - :worker_1_port \c - - - :worker_1_port