Refactor subscriber code

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-17 15:32:34 +05:30
parent 1c617e7d1d
commit b26bab32de
7 changed files with 441 additions and 237 deletions

View File

@ -106,6 +106,11 @@ void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval);
char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode); WorkerNode *sourceWorkerNode);
static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
static const char *const SplitOperationName[] = static const char *const SplitOperationName[] =
{ {
@ -1147,8 +1152,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */ /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */
/* shardIntervalToSplit, sourceShardToCopyNode); */ /* shardIntervalToSplit, sourceShardToCopyNode); */
// DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, /* DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, */
// shardGroupSplitIntervalListList, workersForPlacementList, NULL); /* shardGroupSplitIntervalListList, workersForPlacementList, NULL); */
CreateDummyShardsForShardGroup( CreateDummyShardsForShardGroup(
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
@ -1162,27 +1167,26 @@ NonBlockingShardSplit(SplitOperation splitOperation,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
sourceShardToCopyNode, sourceShardToCopyNode,
workersForPlacementList); workersForPlacementList);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
// /* /* Insert new shard and placement metdata */
// * Drop old shards and delete related metadata. Have to do that before InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
// * creating the new shard metadata, because there's cross-checks workersForPlacementList);
// * preventing inconsistent metadata (like overlapping shards).
// */
// DropShardList(sourceColocatedShardIntervalList);
// /* Insert new shard and placement metdata */ /*
// InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, * Create foreign keys if exists after the metadata changes happening in
// workersForPlacementList); * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
// /* DropDummyShards();
// * Create foreign keys if exists after the metadata changes happening in
// * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
// * key creation depends on the new metadata.
// */
// CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
// workersForPlacementList);
// DropDummyShards();
} }
PG_CATCH(); PG_CATCH();
{ {
@ -1351,69 +1355,90 @@ SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList) List *destinationWorkerNodesList)
{ {
StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF( /* / * // / *Create Template replication slot * / * / */
sourceColocatedShardIntervalList, /* / * char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( * / */
shardGroupSplitIntervalListList, /* / * shardIntervalToSplit, sourceWorkerNode); * / */
destinationWorkerNodesList); /* List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication( */
/* sourceColocatedShardIntervalList, */
/* shardGroupSplitIntervalListList, */
/* destinationWorkerNodesList, */
/* replicationSlotInfoList); */
/* earlier the above method used to take replication slot info as information */
/* LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, */
/* sourceColocatedShardIntervalList, */
/* shardGroupSplitIntervalListList, */
/* destinationWorkerNodesList); */
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
ClaimConnectionExclusively(sourceConnection);
PGresult *result = NULL; HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
int queryResult = ExecuteOptionalRemoteCommand(sourceConnection,
splitShardReplicationUDF->data,
&result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result))
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Failed to run worker_split_shard_replication_setup")));
PQclear(result);
ForgetResults(sourceConnection);
}
/* Get replication slot information */
List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result);
PQclear(result);
ForgetResults(sourceConnection);
// /* // / *Create Template replication slot * / */
// /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */
// /* shardIntervalToSplit, sourceWorkerNode); */
// List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication(
// sourceColocatedShardIntervalList,
// shardGroupSplitIntervalListList,
// destinationWorkerNodesList,
// replicationSlotInfoList);
// earlier the above method used to take replication slot info as information
// LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata,
// sourceColocatedShardIntervalList,
// shardGroupSplitIntervalListList,
// destinationWorkerNodesList);
HTAB * shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
destinationWorkerNodesList); destinationWorkerNodesList);
DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication);
CreateShardSplitPublicationsTwo(sourceConnection, shardSplitHashMapForPublication); MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
superUser,
databaseName);
ClaimConnectionExclusively(sourceConnection);
//DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*Create Template Replication Slot */
/* DoSplitCopy */
/*worker_split_replication_setup_udf*/
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceWorkerNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList);
/* Subscriber flow starts from here */
List *shardSplitSubscribersMetadataList = PopulateShardSplitSubscriptionsMetadataList(
shardSplitHashMapForPublication, replicationSlotInfoList);
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
shardSplitSubscribersMetadataList,
connectionFlags,
superUser, databaseName);
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList,
sourceWorkerNode,
superUser,
databaseName);
WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitSubscribersMetadataList);
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
destinationWorkerNodesList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
BlockWritesToShardList(sourceColocatedShardIntervalList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/*DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); */
} }
@ -1497,6 +1522,85 @@ TryDropShard(MultiConnection *connection, ShardInterval *shardInterval)
} }
char *
CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode)
{
/*Create Template replication slot */
int connectionFlags = FORCE_NEW_CONNECTION;
connectionFlags |= EXCLUSIVE_AND_REPLICATION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval,
sourceConnection);
return snapShotName;
}
static List *
ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList)
{
StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList);
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
ClaimConnectionExclusively(sourceConnection);
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(sourceConnection,
splitShardReplicationUDF->data,
&result);
/*
* Result should contain atleast one tuple. The information returned is
* set of tuples where each tuple is formatted as:
* <targetNodeId, tableOwnerName, replication_slot_name>.
*/
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) < 1 ||
PQnfields(result) != 3)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg(
"Failed to run worker_split_shard_replication_setup UDF. It should successfully execute "
" for splitting a shard in a non-blocking way. Please retry.")));
PQclear(result);
ForgetResults(sourceConnection);
}
/* Get replication slot information */
List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result);
PQclear(result);
ForgetResults(sourceConnection);
UnclaimConnection(sourceConnection);
return replicationSlotInfoList;
}
StringInfo StringInfo
CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
@ -1549,27 +1653,3 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
return splitShardReplicationUDF; return splitShardReplicationUDF;
} }
char *
CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode)
{
/*Create Template replication slot */
int connectionFlags = FORCE_NEW_CONNECTION;
connectionFlags |= EXCLUSIVE_AND_REPLICATION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval,
sourceConnection);
return snapShotName;
}

View File

@ -1166,7 +1166,7 @@ ShardSubscriptionName(Oid ownerId, char *operationPrefix)
* subscription that subscribes to the tables of the given owner. * subscription that subscribes to the tables of the given owner.
*/ */
static char * static char *
ShardSubscriptionRole(Oid ownerId, char * operationPrefix) ShardSubscriptionRole(Oid ownerId, char *operationPrefix)
{ {
return psprintf("%s%i", operationPrefix, ownerId); return psprintf("%s%i", operationPrefix, ownerId);
} }
@ -1322,8 +1322,9 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds
while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0)
{ {
DropShardSubscription(connection, ShardSubscriptionName(ownerId, DropShardSubscription(connection, ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX)); SHARD_MOVE_SUBSCRIPTION_PREFIX));
DropShardUser(connection, ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)); DropShardUser(connection, ShardSubscriptionRole(ownerId,
SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX));
} }
} }
@ -1508,7 +1509,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
"ALTER SUBSCRIPTION %s OWNER TO %s", "ALTER SUBSCRIPTION %s OWNER TO %s",
ShardSubscriptionName(ownerId, ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX), SHARD_MOVE_SUBSCRIPTION_PREFIX),
ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) ShardSubscriptionRole(ownerId,
SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)
)); ));
/* /*
@ -2118,7 +2120,8 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
"ALTER SUBSCRIPTION %s OWNER TO %s", "ALTER SUBSCRIPTION %s OWNER TO %s",
ShardSubscriptionName(ownerId, ShardSubscriptionName(ownerId,
SHARD_SPLIT_SUBSCRIPTION_PREFIX), SHARD_SPLIT_SUBSCRIPTION_PREFIX),
ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) ShardSubscriptionRole(ownerId,
SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)
)); ));
/* /*

View File

@ -18,7 +18,6 @@
#include "distributed/shard_split.h" #include "distributed/shard_split.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/shardsplit_logical_replication.h" #include "distributed/shardsplit_logical_replication.h"
#include "distributed/multi_logical_replication.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
@ -29,41 +28,22 @@ static HTAB *ShardInfoHashMapForPublications = NULL;
static void AddPublishableShardEntryInMap(uint32 targetNodeId, static void AddPublishableShardEntryInMap(uint32 targetNodeId,
ShardInterval *shardInterval, bool ShardInterval *shardInterval, bool
isChildShardInterval); isChildShardInterval);
ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32
List *shardIdList, nodeId,
List *replicationSlotInfoList); List *
replicationSlotInfoList);
static void CreateShardSplitPublicationForNode(MultiConnection *connection, static void CreateShardSplitPublicationForNode(MultiConnection *connection,
List *shardList, List *shardList,
uint32_t publicationForTargetNodeId, Oid uint32_t publicationForTargetNodeId, Oid
tableOwner); tableOwner);
static void CreateShardSplitPublications(MultiConnection *sourceConnection,
List *shardSplitPubSubMetadataList);
static void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList,
WorkerNode *sourceWorkerNode, char *superUser,
char *databaseName);
static void WaitForShardSplitRelationSubscriptionsBecomeReady(
List *shardSplitPubSubMetadataList);
static void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *
shardSplitPubSubMetadataList);
static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int
connectionFlags, char *user,
char *databaseName);
static void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection); static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection);
static void DropAllShardSplitPublications(MultiConnection * cleanupConnection); static void DropAllShardSplitPublications(MultiConnection *cleanupConnection);
static void DropAllShardSplitUsers(MultiConnection * cleanupConnection); static void DropAllShardSplitUsers(MultiConnection *cleanupConnection);
static void DropAllReplicationSlots(List * replicationSlotInfo); static void DropAllReplicationSlots(List *replicationSlotInfo);
/*used for debuggin. Remove later*/
void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata);
List * List *
@ -72,8 +52,6 @@ ParseReplicationSlotInfoFromResult(PGresult *result)
int64 rowCount = PQntuples(result); int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result); int64 colCount = PQnfields(result);
printf("sameer row count %d col count: %d\n ", rowCount, colCount);
List *replicationSlotInfoList = NIL; List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
@ -91,17 +69,14 @@ ParseReplicationSlotInfoFromResult(PGresult *result)
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
} }
/*TODO(saawasek): size of this should not be NULL
* Also check for warning
*/
return replicationSlotInfoList; return replicationSlotInfoList;
} }
HTAB * HTAB *
CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList) List *destinationWorkerNodesList)
{ {
ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); ShardInfoHashMapForPublications = SetupHashMapForShardInfo();
ShardInterval *sourceShardIntervalToCopy = NULL; ShardInterval *sourceShardIntervalToCopy = NULL;
@ -174,14 +149,13 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
} }
ShardSplitPubSubMetadata * ShardSplitSubscriberMetadata *
CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardIntervalList, CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
List *replicationSlotInfoList) List *replicationSlotInfoList)
{ {
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = palloc0( ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0(
sizeof(ShardSplitPubSubMetadata)); sizeof(ShardSplitSubscriberMetadata));
shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList; shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId;
shardSplitPubSubMetadata->tableOwnerId = tableOwnerId;
char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); char *tableOwnerName = GetUserNameFromId(tableOwnerId, false);
ReplicationSlotInfo *replicationSlotInfo = NULL; ReplicationSlotInfo *replicationSlotInfo = NULL;
@ -190,13 +164,12 @@ CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardInter
if (nodeId == replicationSlotInfo->targetNodeId && if (nodeId == replicationSlotInfo->targetNodeId &&
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
{ {
shardSplitPubSubMetadata->slotInfo = replicationSlotInfo; shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo;
break; break;
} }
} }
PrintShardSplitPubSubMetadata(shardSplitPubSubMetadata); return shardSplitSubscriberMetadata;
return shardSplitPubSubMetadata;
} }
@ -225,7 +198,7 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
superUser, databaseName); superUser, databaseName);
/* create publications */ /* create publications */
CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); /*CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); */
CreateShardSplitSubscriptions(targetNodeConnectionList, CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitPubSubMetadataList, shardSplitPubSubMetadataList,
@ -257,46 +230,38 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
void void
PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata) PrintShardSplitPubSubMetadata(ShardSplitSubscriberMetadata *shardSplitMetadata)
{ {
printf("sameer: ShardSplitPubSbuMetadata\n"); printf("\nsameer: ShardSplitPubSbuMetadata");
ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo; ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo;
printf("Manual Username from OID at source: %s \n", GetUserNameFromId(
List *shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription;
printf("shardIds: ");
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
printf("%ld ", shardInterval->shardId);
}
printf("\nManual Username from OID at source: %s \n", GetUserNameFromId(
shardSplitMetadata->tableOwnerId, false)); shardSplitMetadata->tableOwnerId, false));
printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName,
replicationInfo->targetNodeId, replicationInfo->tableOwnerName); replicationInfo->targetNodeId, replicationInfo->tableOwnerName);
printf("\n");
} }
static void /* static void */
CreateShardSplitPublications(MultiConnection *sourceConnection, /* CreateShardSplitPublications(MultiConnection *sourceConnection, */
List *shardSplitPubSubMetadataList) /* List *shardSplitPubSubMetadataList) */
{ /* { */
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; /* ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; */
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) /* foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) */
{ /* { */
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; /* uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; */
Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; /* Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; */
CreateShardSplitPublicationForNode(sourceConnection, /* CreateShardSplitPublicationForNode(sourceConnection, */
shardSplitPubSubMetadata-> /* shardSplitPubSubMetadata-> */
shardIntervalListForSubscription, /* shardIntervalListForSubscription, */
publicationForNodeId, /* publicationForNodeId, */
tableOwnerId); /* tableOwnerId); */
} /* } */
} /* } */
static void void
CreateShardSplitSubscriptions(List *targetNodeConnectionList, CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList, List *shardSplitPubSubMetadataList,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
@ -304,7 +269,7 @@ CreateShardSplitSubscriptions(List *targetNodeConnectionList,
char *databaseName) char *databaseName)
{ {
MultiConnection *targetConnection = NULL; MultiConnection *targetConnection = NULL;
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList, forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList) shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
@ -358,10 +323,10 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
} }
static void void
WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList) WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList)
{ {
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
Bitmapset *tableOwnerIds = NULL; Bitmapset *tableOwnerIds = NULL;
@ -374,11 +339,11 @@ WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadata
} }
static void void
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *shardSplitPubSubMetadataList) List *shardSplitPubSubMetadataList)
{ {
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{ {
Bitmapset *tableOwnerIds = NULL; Bitmapset *tableOwnerIds = NULL;
@ -394,14 +359,15 @@ WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List * List *
CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int
connectionFlags, char *user, char *databaseName) connectionFlags, char *user, char *databaseName)
{ {
List *targetNodeConnectionList = NIL; List *targetNodeConnectionList = NIL;
ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) foreach_ptr(shardSplitSubscriberMetadata, shardSplitSubscribersMetadataList)
{ {
uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; /*TODO(saawasek):For slot equals not null */
uint32 targetWorkerNodeId = shardSplitSubscriberMetadata->slotInfo->targetNodeId;
WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false);
MultiConnection *targetConnection = MultiConnection *targetConnection =
@ -412,7 +378,8 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int
ClaimConnectionExclusively(targetConnection); ClaimConnectionExclusively(targetConnection);
targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection);
shardSplitPubSubMetadata->targetNodeConnection = targetConnection;
shardSplitSubscriberMetadata->targetNodeConnection = targetConnection;
} }
return targetNodeConnectionList; return targetNodeConnectionList;
@ -476,7 +443,8 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo
} }
void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMapForPubSub) void
DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub)
{ {
char *superUser = CitusExtensionOwnerName(); char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId); char *databaseName = get_database_name(MyDatabaseId);
@ -493,7 +461,8 @@ void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMap
hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub); hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub);
NodeShardMappingEntry *entry = NULL; NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != NULL) while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) !=
NULL)
{ {
uint32_t nodeId = entry->key.nodeId; uint32_t nodeId = entry->key.nodeId;
WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/); WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/);
@ -509,15 +478,17 @@ void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMap
/*Drop all shard split publications at the source*/ /*Drop all shard split publications at the source*/
MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection( MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection(
connectionFlags, sourceNode->workerName, sourceNode->workerPort, connectionFlags, sourceNode->workerName, sourceNode->workerPort,
superUser, databaseName); superUser, databaseName);
DropAllShardSplitPublications(sourceNodeConnection); DropAllShardSplitPublications(sourceNodeConnection);
CloseConnection(sourceNodeConnection); CloseConnection(sourceNodeConnection);
} }
void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection)
void
DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
{ {
char *query = psprintf( char *query = psprintf(
"SELECT subname FROM pg_subscription " "SELECT subname FROM pg_subscription "
@ -531,6 +502,7 @@ void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection)
} }
} }
static void static void
DropAllShardSplitPublications(MultiConnection *connection) DropAllShardSplitPublications(MultiConnection *connection)
{ {
@ -546,10 +518,11 @@ DropAllShardSplitPublications(MultiConnection *connection)
} }
} }
void void
DropAllShardSplitUsers(MultiConnection *connection) DropAllShardSplitUsers(MultiConnection *connection)
{ {
char *query = psprintf( char *query = psprintf(
"SELECT rolname FROM pg_roles " "SELECT rolname FROM pg_roles "
"WHERE rolname LIKE %s || '%%'", "WHERE rolname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX));
@ -561,9 +534,10 @@ DropAllShardSplitUsers(MultiConnection *connection)
} }
} }
void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection,
HTAB * shardInfoHashMapForPublication)
void
CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication); hash_seq_init(&status, shardInfoHashMapForPublication);
@ -573,11 +547,38 @@ void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection,
{ {
uint32 nodeId = entry->key.nodeId; uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId; uint32 tableOwnerId = entry->key.tableOwnerId;
List * shardListForPublication = entry->shardSplitInfoList; List *shardListForPublication = entry->shardSplitInfoList;
CreateShardSplitPublicationForNode(sourceConnection, CreateShardSplitPublicationForNode(sourceConnection,
shardListForPublication, shardListForPublication,
nodeId, nodeId,
tableOwnerId); tableOwnerId);
} }
} }
List *
PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
List *replicationSlotInfoList)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardSplitInfoHashMap);
NodeShardMappingEntry *entry = NULL;
List *shardSplitSubscriptionMetadataList = NIL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata =
CreateShardSplitSubscriberMetadata(tableOwnerId, nodeId,
replicationSlotInfoList);
shardSplitSubscriptionMetadataList = lappend(shardSplitSubscriptionMetadataList,
shardSplitSubscriberMetadata);
/*replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); */
}
return shardSplitSubscriptionMetadataList;
}

View File

@ -27,11 +27,10 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int targetNodePort); int targetNodePort);
extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection);
extern List * extern List * GetQueryResultStringList(MultiConnection *connection, char *query);
GetQueryResultStringList(MultiConnection *connection, char *query);
extern void DropShardSubscription(MultiConnection *connection, extern void DropShardSubscription(MultiConnection *connection,
char *subscriptionName); char *subscriptionName);
extern void DropShardPublication(MultiConnection *connection, char *publicationName); extern void DropShardPublication(MultiConnection *connection, char *publicationName);
extern void DropShardUser(MultiConnection *connection, char *username); extern void DropShardUser(MultiConnection *connection, char *username);

View File

@ -10,6 +10,8 @@
#ifndef SHARDSPLIT_LOGICAL_REPLICATION_H #ifndef SHARDSPLIT_LOGICAL_REPLICATION_H
#define SHARDSPLIT_LOGICAL_REPLICATION_H #define SHARDSPLIT_LOGICAL_REPLICATION_H
#include "distributed/multi_logical_replication.h"
typedef struct ReplicationSlotInfo typedef struct ReplicationSlotInfo
{ {
uint32 targetNodeId; uint32 targetNodeId;
@ -17,19 +19,17 @@ typedef struct ReplicationSlotInfo
char *slotName; char *slotName;
} ReplicationSlotInfo; } ReplicationSlotInfo;
typedef struct ShardSplitPubSubMetadata typedef struct ShardSplitSubscriberMetadata
{ {
List *shardIntervalListForSubscription;
Oid tableOwnerId; Oid tableOwnerId;
ReplicationSlotInfo *slotInfo; ReplicationSlotInfo *slotInfo;
/* /*
* Exclusively claimed connection for subscription. * Exclusively claimed connection for subscription.The target node of subscription
* The target node of subscription
* is pointed by ReplicationSlotInfo. * is pointed by ReplicationSlotInfo.
*/ */
MultiConnection *targetNodeConnection; MultiConnection *targetNodeConnection;
} ShardSplitPubSubMetadata; } ShardSplitSubscriberMetadata;
/* key for NodeShardMappingEntry */ /* key for NodeShardMappingEntry */
typedef struct NodeShardMappingKey typedef struct NodeShardMappingKey
@ -49,26 +49,48 @@ extern uint32 NodeShardMappingHash(const void *key, Size keysize);
extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
HTAB * SetupHashMapForShardInfo(void); HTAB * SetupHashMapForShardInfo(void);
List * ParseReplicationSlotInfoFromResult(PGresult *result); extern List * ParseReplicationSlotInfoFromResult(PGresult *result);
extern HTAB * CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, extern HTAB * CreateShardSplitInfoMapForPublication(
List *shardGroupSplitIntervalListList, List *sourceColocatedShardIntervalList,
List *destinationWorkerNodesList); List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
List *shardSplitPubSubMetadataList, List *shardSplitPubSubMetadataList,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
extern void CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode,
HTAB *shardSplitMapOfPublications);
extern void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection, extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
HTAB * shardInfoHashMapForPublication); List *replicationSlotInfoList);
extern void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitMapOfPublications);
extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot( extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot(
ShardInterval *shardIntervalToSplit, ShardInterval *shardIntervalToSplit,
MultiConnection * MultiConnection *
sourceConnection); sourceConnection);
extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList,
WorkerNode *sourceWorkerNode, char *superUser,
char *databaseName);
extern void WaitForShardSplitRelationSubscriptionsBecomeReady(
List *shardSplitPubSubMetadataList);
extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *
shardSplitPubSubMetadataList);
List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList,
int
connectionFlags, char *user,
char *databaseName);
/*used for debuggin. Remove later*/
extern void PrintShardSplitPubSubMetadata(
ShardSplitSubscriberMetadata *shardSplitMetadata);
#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */

View File

@ -100,7 +100,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT citus_split_shard_by_split_points( SELECT citus_split_shard_by_split_points(
1, 1,
ARRAY['0'], ARRAY['0'],
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_2_node, :worker_2_node],
'force_logical'); 'force_logical');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -128,28 +128,111 @@ NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 16)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%' NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%' NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pubname FROM pg_publication WHERE pubname LIKE 'citus_shard_split_publication_' || '%' NOTICE: issuing SELECT pubname FROM pg_publication WHERE pubname LIKE 'citus_shard_split_publication_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_16_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1 NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_101,citus_split_shard_by_split_points_negative.table_to_split_1 NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 18)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE USER citus_shard_split_subscription_role_10 SUPERUSER IN ROLE postgres
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_split_subscription_role_10
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER ROLE citus_shard_split_subscription_role_10 NOSUPERUSER
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit
citus_split_shard_by_split_points citus_split_shard_by_split_points
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -164,25 +247,38 @@ SELECT * FROM show_catalog;
Schema | Name | Owner Schema | Name | Owner
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_to_split | postgres citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_1 | postgres citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(3 rows) (3 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
17324 | 16384 | citus_shard_split_subscription_10 | 17323 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_shard_split_subscription_10 | off | {citus_shard_split_publication_18_10}
(1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative; SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog; SELECT * FROM show_catalog;
Schema | Name | Owner Schema | Name | Owner
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_to_split | postgres citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_1 | postgres (1 row)
citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(4 rows)
SELECT * FROM pg_publication; SELECT * FROM pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
--------------------------------------------------------------------- ---------------------------------------------------------------------
17381 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f 17381 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f
17384 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f (1 row)
(2 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
(0 rows)
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_subscription_10
(1 row)

View File

@ -48,7 +48,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT citus_split_shard_by_split_points( SELECT citus_split_shard_by_split_points(
1, 1,
ARRAY['0'], ARRAY['0'],
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_2_node, :worker_2_node],
'force_logical'); 'force_logical');
-- On worker2, we want child shard 2 and dummy shard 1 -- -- On worker2, we want child shard 2 and dummy shard 1 --
-- on worker1, we want child shard 3 and 1 and dummy shard 2 -- -- on worker1, we want child shard 3 and 1 and dummy shard 2 --
@ -56,8 +56,11 @@ SELECT citus_split_shard_by_split_points(
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative; SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog; SELECT * FROM show_catalog;
SELECT * FROM pg_subscription;
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative; SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog; SELECT * FROM show_catalog;
SELECT * FROM pg_publication; SELECT * FROM pg_publication;
SELECT * FROM pg_subscription;
SELECT slot_name FROM pg_replication_slots;