Adding comments and code for cleanup

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-22 20:10:18 +05:30
parent 9e88c14096
commit 8c871bcd10
21 changed files with 1122 additions and 444 deletions

View File

@ -379,8 +379,9 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
if (key->replication)
{
connKeywords[authParamsIdx] = "replication";
connValues[authParamsIdx] = "database";
connKeywords[authParamsIdx] = MemoryContextStrdup(context, "replication");
connValues[authParamsIdx] = MemoryContextStrdup(context, "database");
authParamsIdx++;
}

View File

@ -71,11 +71,6 @@ static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalLis
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *workersForPlacementList);
static void SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *workersForPlacementList);
static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList);
static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
@ -100,10 +95,6 @@ static void DoSplitCopy(WorkerNode *sourceShardNode,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
char *snapShotName);
static void DoSplitCopy(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *workersForPlacementList);
@ -124,7 +115,9 @@ static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInt
static void DropDummyShards(void);
void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval);
char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode);
WorkerNode *sourceWorkerNode,
MultiConnection **
templateSlotConnection);
static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList,
@ -181,28 +174,6 @@ ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShar
errdetail("Splitting shards backed by foreign tables "
"is not supported.")));
}
/*
* At the moment, we do not support copying a shard if that shard's
* relation is in a colocation group with a partitioned table or partition.
*/
if (PartitionedTable(colocatedTableId))
{
char *sourceRelationName = get_rel_name(relationId);
char *colocatedRelationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot %s of '%s', because it "
"is a partitioned table",
SplitOperationName[splitOperation],
colocatedRelationName),
errdetail("In colocation group of '%s', a partitioned "
"relation exists: '%s'. Citus does not support "
"%s of partitioned tables.",
sourceRelationName,
colocatedRelationName,
SplitOperationName[splitOperation])));
}
}
/* check shards with inactive placements */
@ -787,48 +758,48 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
*/
if (!PartitionedTable(sourceShardIntervalToCopy->relationId))
{
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy,
splitShardIntervalList,
destinationWorkerNodesList);
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(
sourceShardIntervalToCopy,
splitShardIntervalList,
destinationWorkerNodesList);
List *ddlCommandList = NIL;
StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
ddlCommandList = lappend(ddlCommandList, beginTransaction->data);
List *ddlCommandList = NIL;
StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
ddlCommandList = lappend(ddlCommandList, beginTransaction->data);
/* Set snapshot */
if (snapShotName != NULL)
{
StringInfo snapShotString = makeStringInfo();
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(
snapShotName));
ddlCommandList = lappend(ddlCommandList, snapShotString->data);
printf("Sameer final string snapshotted:%s\n", snapShotString->data);
}
/* Set snapshot */
if (snapShotName != NULL)
{
StringInfo snapShotString = makeStringInfo();
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(
snapShotName));
ddlCommandList = lappend(ddlCommandList, snapShotString->data);
printf("Sameer final string snapshotted:%s\n", snapShotString->data);
}
ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data);
ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data);
StringInfo commitCommand = makeStringInfo();
appendStringInfo(commitCommand, "COMMIT;");
ddlCommandList = lappend(ddlCommandList, commitCommand->data);
StringInfo commitCommand = makeStringInfo();
appendStringInfo(commitCommand, "COMMIT;");
ddlCommandList = lappend(ddlCommandList, commitCommand->data);
Task *splitCopyTask = CitusMakeNode(Task);
splitCopyTask->jobId = sourceShardIntervalToCopy->shardId;
splitCopyTask->taskId = taskId;
splitCopyTask->taskType = READ_TASK;
splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(splitCopyTask, ddlCommandList);
Task *splitCopyTask = CitusMakeNode(Task);
splitCopyTask->jobId = sourceShardIntervalToCopy->shardId;
splitCopyTask->taskId = taskId;
splitCopyTask->taskType = READ_TASK;
splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(splitCopyTask, ddlCommandList);
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
splitCopyTask->taskPlacementList = list_make1(taskPlacement);
splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask);
taskId++;
splitCopyTask->taskPlacementList = list_make1(taskPlacement);
splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask);
taskId++;
}
}
@ -1307,16 +1278,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateEmptyMapForShardsCreatedByWorkflow();
PG_TRY();
{
/*
* Physically create split children, perform split copy and create auxillary structures.
* This includes: indexes, replicaIdentity. triggers and statistics.
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateSplitShardsForShardGroupTwo(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
/* Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList,
workersForPlacementList);
CreateDummyShardsForShardGroup(
@ -1327,9 +1292,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*Create Template Replication Slot */
char *snapShotName = NULL;
snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(shardIntervalToSplit, sourceShardToCopyNode);
/* Create Template Replication Slot */
MultiConnection *templateSlotConnection = NULL;
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
/* DoSplitCopy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
@ -1354,7 +1320,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
superUser, databaseName);
/* Create copies of template replication slot */
CreateReplicationSlots(sourceConnection, shardSplitSubscribersMetadataList);
char *templateSlotName = ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId);
CreateReplicationSlots(sourceConnection, templateSlotName,
shardSplitSubscribersMetadataList);
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList,
@ -1382,6 +1351,12 @@ NonBlockingShardSplit(SplitOperation splitOperation,
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* Drop Subscribers */
DropShardSplitSubsriptions(shardSplitSubscribersMetadataList);
/* Drop Publications */
DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
@ -1400,7 +1375,17 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
DropDummyShards();
/* Close source connection */
CloseConnection(sourceConnection);
/* Close all subscriber connections */
CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList);
/* Close connection of template replication slot */
CloseConnection(templateSlotConnection);
}
PG_CATCH();
{
@ -1561,84 +1546,6 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
}
static void
SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList);
DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication);
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
superUser,
databaseName);
ClaimConnectionExclusively(sourceConnection);
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*Create Template Replication Slot */
/* DoSplitCopy */
/*worker_split_replication_setup_udf*/
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceWorkerNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList);
/* Subscriber flow starts from here */
List *shardSplitSubscribersMetadataList = PopulateShardSplitSubscriptionsMetadataList(
shardSplitHashMapForPublication, replicationSlotInfoList);
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
shardSplitSubscribersMetadataList,
connectionFlags,
superUser, databaseName);
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList,
sourceWorkerNode,
superUser,
databaseName);
WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitSubscribersMetadataList);
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
destinationWorkerNodesList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
BlockWritesToShardList(sourceColocatedShardIntervalList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/*DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); */
}
static void
AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval)
{
@ -1681,6 +1588,7 @@ DropDummyShards()
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
connectionFlags |= FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags,
shardToBeDroppedNode->workerName,
@ -1694,6 +1602,8 @@ DropDummyShards()
{
TryDropShard(connection, shardInterval);
}
CloseConnection(connection);
}
}
@ -1721,7 +1631,8 @@ TryDropShard(MultiConnection *connection, ShardInterval *shardInterval)
char *
CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode)
WorkerNode *sourceWorkerNode,
MultiConnection **templateSlotConnection)
{
/*Create Template replication slot */
int connectionFlags = FORCE_NEW_CONNECTION;
@ -1736,9 +1647,12 @@ CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
get_database_name(
MyDatabaseId));
ClaimConnectionExclusively(sourceConnection);
char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval,
sourceConnection);
*templateSlotConnection = sourceConnection;
return snapShotName;
}
@ -1785,6 +1699,7 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
PQclear(result);
ForgetResults(sourceConnection);
CloseConnection(sourceConnection);
}
/* Get replication slot information */
@ -1792,8 +1707,8 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
PQclear(result);
ForgetResults(sourceConnection);
UnclaimConnection(sourceConnection);
CloseConnection(sourceConnection);
return replicationSlotInfoList;
}

View File

@ -140,7 +140,6 @@ static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection,
Bitmapset *tableOwnerIds,
char *operationPrefix);
static char * ShardMovePublicationName(Oid ownerId);
static char * ShardSubscriptionName(Oid ownerId, char *operationPrefix);
static void AcquireLogicalReplicationLock(void);
static void DropAllShardMoveLeftovers(void);
static void DropAllShardMoveSubscriptions(MultiConnection *connection);
@ -149,8 +148,6 @@ static void DropAllShardMovePublications(MultiConnection *connection);
static void DropAllShardMoveUsers(MultiConnection *connection);
static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds,
char *operationPrefix);
static void DropShardMoveReplicationSlot(MultiConnection *connection,
char *publicationName);
/*
* LogicallyReplicateShards replicates a list of shards from one node to another
@ -1096,7 +1093,7 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds)
* DropShardMoveReplicationSlot drops the replication slot with the given name
* if it exists.
*/
static void
void
DropShardMoveReplicationSlot(MultiConnection *connection, char *replicationSlotName)
{
ExecuteCriticalRemoteCommand(
@ -1144,7 +1141,7 @@ ShardMovePublicationName(Oid ownerId)
* This PID is then extracted from the application_name to find out which PID on the
* coordinator is blocked by the blocked replication process.
*/
static char *
char *
ShardSubscriptionName(Oid ownerId, char *operationPrefix)
{
if (RunningUnderIsolationTest)
@ -1162,7 +1159,7 @@ ShardSubscriptionName(Oid ownerId, char *operationPrefix)
* ShardSubscriptionRole returns the name of the role used by the
* subscription that subscribes to the tables of the given owner.
*/
static char *
char *
ShardSubscriptionRole(Oid ownerId, char *operationPrefix)
{
return psprintf("%s%i", operationPrefix, ownerId);

View File

@ -22,6 +22,7 @@
#include "utils/builtins.h"
#include "commands/dbcommands.h"
static HTAB *ShardInfoHashMapForPublications = NULL;
/* function declarations */
@ -32,46 +33,15 @@ ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwner
nodeId,
List *
replicationSlotInfoList);
static void CreateShardSplitPublicationForNode(MultiConnection *connection,
List *shardList,
uint32_t publicationForTargetNodeId, Oid
tableOwner);
static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection);
static void DropAllShardSplitPublications(MultiConnection *cleanupConnection);
static void DropAllShardSplitUsers(MultiConnection *cleanupConnection);
static void DropAllReplicationSlots(List *replicationSlotInfo);
List *
ParseReplicationSlotInfoFromResult(PGresult *result)
{
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0(
sizeof(ReplicationSlotInfo));
char *targeNodeIdString = PQgetvalue(result, rowIndex, 0);
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
/* we're using the pstrdup to copy the data into the current memory context */
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1));
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2));
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
}
return replicationSlotInfoList;
}
static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection);
HTAB *
@ -150,74 +120,6 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
}
void
LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
List *shardSplitPubSubMetadataList,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
/* Get source node connection */
MultiConnection *sourceConnection =
GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser, databaseName);
ClaimConnectionExclusively(sourceConnection);
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
shardSplitPubSubMetadataList,
connectionFlags,
superUser, databaseName);
/* create publications */
/*CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); */
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitPubSubMetadataList,
sourceWorkerNode,
superUser,
databaseName);
WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitPubSubMetadataList);
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitPubSubMetadataList);
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
destinationWorkerNodesList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitPubSubMetadataList);
BlockWritesToShardList(sourceColocatedShardIntervalList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitPubSubMetadataList);
/*TOOD : Create foreign key constraints and handle partitioned tables*/
}
void
PrintShardSplitPubSubMetadata(ShardSplitSubscriberMetadata *shardSplitMetadata)
{
printf("\nsameer: ShardSplitPubSbuMetadata");
ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo;
printf("Manual Username from OID at source: %s \n", GetUserNameFromId(
shardSplitMetadata->tableOwnerId, false));
printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName,
replicationInfo->targetNodeId, replicationInfo->tableOwnerName);
printf("\n");
}
void
CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList,
@ -348,18 +250,14 @@ char *
DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *sourceConnection)
{
StringInfo splitTemplateReplicationSlotName = makeStringInfo();
appendStringInfo(splitTemplateReplicationSlotName,
"citus_split_replicationslot_for_shard_%lu",
shardIntervalToSplit->shardId);
/*
* To ensure SPLIT is idempotent drop any existing slot from
* previous failed operation.
*/
StringInfo dropReplicationSlotCommand = makeStringInfo();
appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')",
splitTemplateReplicationSlotName->data);
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
/* The Drop command can fail so ignore the response / result and proceed anyways */
PGresult *result = NULL;
@ -383,7 +281,8 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo
/*TODO(saawasek): Try creating TEMPORAL once basic flow is ready and we have a testcase*/
appendStringInfo(createReplicationSlotCommand,
"CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;",
splitTemplateReplicationSlotName->data);
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
response = ExecuteOptionalRemoteCommand(sourceConnection,
createReplicationSlotCommand->data, &result);
@ -396,103 +295,10 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo
/*'snapshot_name' is second column where index starts from zero.
* We're using the pstrdup to copy the data into the current memory context */
char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */));
return snapShotName;
}
void
DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
/*
* We open new connections to all nodes. The reason for this is that
* operations on subscriptions and publications cannot be run in a
* transaction. By forcing a new connection we make sure no transaction is
* active on the connection.
*/
int connectionFlags = FORCE_NEW_CONNECTION;
HASH_SEQ_STATUS statusForSubscription;
hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) !=
NULL)
{
uint32_t nodeId = entry->key.nodeId;
WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/);
MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection(
connectionFlags, workerNode->workerName, workerNode->workerPort,
superUser, databaseName);
DropAllShardSplitSubscriptions(cleanupConnection);
DropAllShardSplitUsers(cleanupConnection);
CloseConnection(cleanupConnection);
}
/*Drop all shard split publications at the source*/
MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection(
connectionFlags, sourceNode->workerName, sourceNode->workerPort,
superUser, databaseName);
DropAllShardSplitPublications(sourceNodeConnection);
CloseConnection(sourceNodeConnection);
}
void
DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
{
char *query = psprintf(
"SELECT subname FROM pg_subscription "
"WHERE subname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX));
List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query);
char *subscriptionName = NULL;
foreach_ptr(subscriptionName, subscriptionNameList)
{
DropShardSubscription(cleanupConnection, subscriptionName);
}
}
static void
DropAllShardSplitPublications(MultiConnection *connection)
{
char *query = psprintf(
"SELECT pubname FROM pg_publication "
"WHERE pubname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX));
List *publicationNameList = GetQueryResultStringList(connection, query);
char *publicationName;
foreach_ptr(publicationName, publicationNameList)
{
DropShardPublication(connection, publicationName);
}
}
void
DropAllShardSplitUsers(MultiConnection *connection)
{
char *query = psprintf(
"SELECT rolname FROM pg_roles "
"WHERE rolname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX));
List *usernameList = GetQueryResultStringList(connection, query);
char *username;
foreach_ptr(username, usernameList)
{
DropShardUser(connection, username);
}
}
void
CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
@ -560,17 +366,13 @@ CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
}
}
PrintShardSplitPubSubMetadata(shardSplitSubscriberMetadata);
return shardSplitSubscriberMetadata;
}
/*TODO(saawasek): Remove existing slots before creating newer ones */
/* extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, List * shardSplitSubscriberMetadataList); */
void
CreateReplicationSlots(MultiConnection *sourceNodeConnection,
CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlotName,
List *shardSplitSubscriberMetadataList)
{
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
@ -580,10 +382,9 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection,
StringInfo createReplicationSlotCommand = makeStringInfo();
/* TODO(niupre): Replace pgoutput with an appropriate name (to e introduced in by saawasek's PR) */
appendStringInfo(createReplicationSlotCommand,
"SELECT * FROM pg_create_logical_replication_slot('%s','citus', false)",
slotName);
"SELECT * FROM pg_copy_logical_replication_slot ('%s','%s')",
templateSlotName, slotName);
PGresult *result = NULL;
int response = ExecuteOptionalRemoteCommand(sourceNodeConnection,
@ -598,3 +399,243 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection,
ForgetResults(sourceNodeConnection);
}
}
/*
* ShardSplitTemplateReplicationSlotName returns name of template replication slot.
*/
char *
ShardSplitTemplateReplicationSlotName(uint64 shardId)
{
return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId);
}
/*
* ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'.
* 'replication_slot_info' is a tuple with below format:
* <targetNodeId, tableOwnerName, replicationSlotName>
*/
List *
ParseReplicationSlotInfoFromResult(PGresult *result)
{
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0(
sizeof(ReplicationSlotInfo));
char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/);
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
/* We're using the pstrdup to copy the data into the current memory context */
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex,
1 /* table owner name column */));
/* Replication slot name */
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex,
2 /* slot name column */));
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
}
return replicationSlotInfoList;
}
/*
* DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles
* and replication slots on all nodes. These might have been left there after
* the coordinator crashed during a shard split. It's important to delete them
* for two reasons:
* 1. Starting new shard split might fail when they exist, because it cannot
* create them.
* 2. Leftover replication slots that are not consumed from anymore make it
* impossible for WAL to be dropped. This can cause out-of-disk issues.
*/
void
DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
/*
* We open new connections to all nodes. The reason for this is that
* operations on subscriptions and publications cannot be run in a
* transaction. By forcing a new connection we make sure no transaction is
* active on the connection.
*/
int connectionFlags = FORCE_NEW_CONNECTION;
HASH_SEQ_STATUS statusForSubscription;
hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) !=
NULL)
{
uint32_t nodeId = entry->key.nodeId;
WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/);
MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection(
connectionFlags, workerNode->workerName, workerNode->workerPort,
superUser, databaseName);
/* We need to claim the connection exclusively while dropping the subscription */
ClaimConnectionExclusively(cleanupConnection);
DropAllShardSplitSubscriptions(cleanupConnection);
DropAllShardSplitUsers(cleanupConnection);
/* Close connection after cleanup */
CloseConnection(cleanupConnection);
}
/*Drop all shard split publications at the source*/
MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection(
connectionFlags, sourceNode->workerName, sourceNode->workerPort,
superUser, databaseName);
ClaimConnectionExclusively(sourceNodeConnection);
/*
* If replication slot could not be dropped while dropping the
* subscriber, drop it here.
*/
DropAllShardSplitReplicationSlots(sourceNodeConnection);
DropAllShardSplitPublications(sourceNodeConnection);
CloseConnection(sourceNodeConnection);
}
void
DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
{
char *query = psprintf(
"SELECT subname FROM pg_subscription "
"WHERE subname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX));
List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query);
char *subscriptionName = NULL;
foreach_ptr(subscriptionName, subscriptionNameList)
{
DropShardSubscription(cleanupConnection, subscriptionName);
}
}
static void
DropAllShardSplitPublications(MultiConnection *connection)
{
char *query = psprintf(
"SELECT pubname FROM pg_publication "
"WHERE pubname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX));
List *publicationNameList = GetQueryResultStringList(connection, query);
char *publicationName;
foreach_ptr(publicationName, publicationNameList)
{
DropShardPublication(connection, publicationName);
}
}
static void
DropAllShardSplitUsers(MultiConnection *connection)
{
char *query = psprintf(
"SELECT rolname FROM pg_roles "
"WHERE rolname LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX));
List *usernameList = GetQueryResultStringList(connection, query);
char *username;
foreach_ptr(username, usernameList)
{
DropShardUser(connection, username);
}
}
static void
DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection)
{
char *query = psprintf(
"SELECT slot_name FROM pg_replication_slots "
"WHERE slot_name LIKE %s || '%%'",
quote_literal_cstr(SHARD_SPLIT_REPLICATION_SLOT_PREFIX));
List *slotNameList = GetQueryResultStringList(cleanupConnection, query);
char *slotName;
foreach_ptr(slotName, slotNameList)
{
DropShardMoveReplicationSlot(cleanupConnection, slotName);
}
}
/*
* DropShardSplitPublications drops the publication used for shard splits over the given
* connection, if it exists.
*/
void
DropShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
DropShardPublication(sourceConnection, ShardSplitPublicationName(nodeId,
tableOwnerId));
}
}
/*
* DropShardSplitSubsriptions drops subscriptions from the subscriber node that
* are used to split shards for the given table owners. Note that, it drops the
* replication slots on the publisher node if it can drop the slots as well
* with the DROP SUBSCRIPTION command. Otherwise, only the subscriptions will
* be deleted with DROP SUBSCRIPTION via the connection. In the latter case,
* replication slots will be dropped separately by calling DropShardSplitReplicationSlots.
*/
void
DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList)
{
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
foreach_ptr(subscriberMetadata, shardSplitSubscribersMetadataList)
{
uint32 tableOwnerId = subscriberMetadata->tableOwnerId;
MultiConnection *targetNodeConnection = subscriberMetadata->targetNodeConnection;
DropShardSubscription(targetNodeConnection, ShardSubscriptionName(tableOwnerId,
SHARD_SPLIT_SUBSCRIPTION_PREFIX));
DropShardUser(targetNodeConnection, ShardSubscriptionRole(tableOwnerId,
SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX));
}
}
/*
* CloseShardSplitSubscriberConnections closes connection of subscriber nodes.
* 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method
* traverses the list and closes each connection.
*/
void
CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList)
{
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList)
{
CloseConnection(subscriberMetadata->targetNodeConnection);
}
}

View File

@ -15,6 +15,7 @@
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/multi_logical_replication.h"
#include "storage/ipc.h"
#include "utils/memutils.h"
#include "common/hashfn.h"
@ -199,7 +200,8 @@ encode_replication_slot(uint32_t nodeId,
uint32_t tableOwnerId)
{
StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "citus_split_%u_%u", nodeId, tableOwnerId);
appendStringInfo(slotName, "%s%u_%u", SHARD_SPLIT_REPLICATION_SLOT_PREFIX, nodeId,
tableOwnerId);
if (slotName->len > NAMEDATALEN)
{

View File

@ -34,7 +34,12 @@ extern void DropShardSubscription(MultiConnection *connection,
extern void DropShardPublication(MultiConnection *connection, char *publicationName);
extern void DropShardUser(MultiConnection *connection, char *username);
extern void DropShardMoveReplicationSlot(MultiConnection *connection,
char *publicationName);
extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix);
extern char * ShardSubscriptionName(Oid ownerId, char *operationPrefix);
extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName,
char *databaseName,
@ -50,9 +55,11 @@ extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection,
char *operationPrefix);
#define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_"
#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_"
#define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_"
#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_"
#define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_"
#define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_"
#define SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX "citus_shard_split_subscription_role_"
#define SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX "citus_shard_split_template_slot_"
#define SHARD_SPLIT_REPLICATION_SLOT_PREFIX "citus_shard_split_"
#endif /* MULTI_LOGICAL_REPLICATION_H_ */

View File

@ -59,8 +59,6 @@ extern void SplitShard(SplitMode splitMode,
/* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */
extern void ErrorIfCannotSplitShard(SplitOperation splitOperation,
ShardInterval *sourceShard);
extern void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
extern void DropShardList(List *shardIntervalList);
#endif /* SHARDSPLIT_H_ */

View File

@ -12,6 +12,12 @@
#include "distributed/multi_logical_replication.h"
/*
* Invocation of 'worker_split_shard_replication_setup' UDF returns set of records
* of custom datatype 'replication_slot_info'. This information is parsed and stored in
* the below data structure. The information is used to create a subscriber on target node
* with corresponding slot name.
*/
typedef struct ReplicationSlotInfo
{
uint32 targetNodeId;
@ -19,13 +25,18 @@ typedef struct ReplicationSlotInfo
char *slotName;
} ReplicationSlotInfo;
/*
* Stores information necesary for creating a subscriber on target node.
* Based on how a shard is split and mapped to target nodes, for each unique combination of
* <tableOwner, targetNodeId> there is a 'ShardSplitSubscriberMetadata'.
*/
typedef struct ShardSplitSubscriberMetadata
{
Oid tableOwnerId;
ReplicationSlotInfo *slotInfo;
/*
* Exclusively claimed connection for subscription.The target node of subscription
* Exclusively claimed connection for a subscription.The target node of subscription
* is pointed by ReplicationSlotInfo.
*/
MultiConnection *targetNodeConnection;
@ -47,52 +58,52 @@ typedef struct NodeShardMappingEntry
extern uint32 NodeShardMappingHash(const void *key, Size keysize);
extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
HTAB * SetupHashMapForShardInfo(void);
extern HTAB * SetupHashMapForShardInfo(void);
/* Functions for subscriber metadata management */
extern List * ParseReplicationSlotInfoFromResult(PGresult *result);
extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
List *replicationSlotInfoList);
extern HTAB * CreateShardSplitInfoMapForPublication(
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
List *shardSplitPubSubMetadataList,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
/* Functions for creating publications and subscriptions*/
extern void CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList,
WorkerNode *sourceWorkerNode, char *superUser,
char *databaseName);
extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection,
char *templateSlotName,
List *shardSplitSubscriberMetadataList);
extern List * CreateTargetNodeConnectionsForShardSplit(
List *shardSplitSubscribersMetadataList,
int
connectionFlags, char *user,
char *databaseName);
/* Functions to drop publisher-subscriber resources */
extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode,
HTAB *shardSplitMapOfPublications);
extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
List *replicationSlotInfoList);
extern void DropShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList);
extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot(
ShardInterval *shardIntervalToSplit,
MultiConnection *
sourceConnection);
extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList,
WorkerNode *sourceWorkerNode, char *superUser,
char *databaseName);
/* Wrapper functions which wait for a subscriber to be ready and catchup */
extern void WaitForShardSplitRelationSubscriptionsBecomeReady(
List *shardSplitPubSubMetadataList);
extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *
shardSplitPubSubMetadataList);
List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList,
int
connectionFlags, char *user,
char *databaseName);
extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection,
List *shardSplitSubscriberMetadataList);
extern char * ShardSplitTemplateReplicationSlotName(uint64 shardId);
/*used for debuggin. Remove later*/
extern void PrintShardSplitPubSubMetadata(
ShardSplitSubscriberMetadata *shardSplitMetadata);
extern void CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList);
#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */

View File

@ -0,0 +1,465 @@
/*
Citus Shard Split Test.The test is model similar to 'shard_move_constraints'.
Here is a high level overview of test plan:
1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table.
2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors.
3. Create Foreign key constraints between the two co-located distributed tables.
4. Load data into the three tables.
5. Move one of the shards for 'sensors' to test ShardMove -> Split.
6. Trigger Split on both shards of 'sensors'. This will also split co-located tables.
7. Move one of the split shard to test Split -> ShardMove.
8. Split an already split shard second time on a different schema.
*/
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role;
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
CREATE TABLE sensors(
measureid integer,
eventdatetime date,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data));
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
-- BEGIN: Create co-located distributed and reference tables.
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table_with_index_rep_identity(key int NOT NULL);
CREATE UNIQUE INDEX uqx ON table_with_index_rep_identity(key);
ALTER TABLE table_with_index_rep_identity REPLICA IDENTITY USING INDEX uqx;
CLUSTER table_with_index_rep_identity USING uqx;
SELECT create_distributed_table('table_with_index_rep_identity', 'key', colocate_with:='sensors');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- END: Create co-located distributed and reference tables.
-- BEGIN : Create Foreign key constraints.
ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
-- END : Create Foreign key constraints.
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
SELECT COUNT(*) FROM sensors;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT COUNT(*) FROM reference_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT COUNT(*) FROM colocated_dist_table;
count
---------------------------------------------------------------------
1001
(1 row)
-- END: Load data into tables.
-- BEGIN : Display current state.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
ORDER BY logicalrelid, shardminvalue::BIGINT;
shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport
---------------------------------------------------------------------
8981000 | sensors | -2147483648 | -1 | localhost | 57637
8981001 | sensors | 0 | 2147483647 | localhost | 57638
8981003 | colocated_dist_table | -2147483648 | -1 | localhost | 57637
8981004 | colocated_dist_table | 0 | 2147483647 | localhost | 57638
8981005 | table_with_index_rep_identity | -2147483648 | -1 | localhost | 57637
8981006 | table_with_index_rep_identity | 0 | 2147483647 | localhost | 57638
(6 rows)
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_8981000 | fkey_table_to_dist_8981000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981003(measureid)
(1 row)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
sensors_8981000 | CREATE INDEX hash_index_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981000 | CREATE INDEX index_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (lower((measureid)::text))
sensors_8981000 | CREATE INDEX index_with_include_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981000 | CREATE UNIQUE INDEX sensors_pkey_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (measureid, eventdatetime, measure_data)
(4 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
table_with_index_rep_identity_8981005 | CREATE UNIQUE INDEX uqx_8981005 ON citus_split_test_schema.table_with_index_rep_identity_8981005 USING btree (key)
(1 row)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
stats_on_sensors
stats_on_sensors_8981000
(2 rows)
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_8981001 | fkey_table_to_dist_8981001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981004(measureid)
(1 row)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
sensors_8981001 | CREATE INDEX hash_index_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981001 | CREATE INDEX index_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (lower((measureid)::text))
sensors_8981001 | CREATE INDEX index_with_include_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981001 | CREATE UNIQUE INDEX sensors_pkey_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (measureid, eventdatetime, measure_data)
(4 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
table_with_index_rep_identity_8981006 | CREATE UNIQUE INDEX uqx_8981006 ON citus_split_test_schema.table_with_index_rep_identity_8981006 USING btree (key)
(1 row)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
stats_on_sensors
stats_on_sensors_8981001
(2 rows)
-- END : Display current state
-- BEGIN : Move one shard before we split it.
\c - postgres - :master_port
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981007;
SET citus.defer_drop_after_shard_move TO OFF;
SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- END : Move one shard before we split it.
-- BEGIN : Set node id variables
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- END : Set node id variables
-- BEGIN : Split two shards : One with move and One without move.
-- Perform 2 way split
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-1073741824'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981000" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
-- Perform 3 way split
SELECT pg_catalog.citus_split_shard_by_split_points(
8981001,
ARRAY['536870911', '1610612735'],
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981001" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
-- END : Split two shards : One with move and One without move.
-- BEGIN : Move a shard post split.
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- END : Move a shard post split.
-- BEGIN : Display current state.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
ORDER BY logicalrelid, shardminvalue::BIGINT;
shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport
---------------------------------------------------------------------
8981007 | sensors | -2147483648 | -1073741824 | localhost | 57638
8981008 | sensors | -1073741823 | -1 | localhost | 57638
8981013 | sensors | 0 | 536870911 | localhost | 57637
8981014 | sensors | 536870912 | 1610612735 | localhost | 57637
8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638
8981009 | colocated_dist_table | -2147483648 | -1073741824 | localhost | 57638
8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638
8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637
8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637
8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638
8981011 | table_with_index_rep_identity | -2147483648 | -1073741824 | localhost | 57638
8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638
8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637
8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637
8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638
(15 rows)
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_8981013 | fkey_table_to_dist_8981013 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981016(measureid)
sensors_8981014 | fkey_table_to_dist_8981014 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981017(measureid)
(2 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
sensors_8981013 | CREATE INDEX hash_index_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981013 | CREATE INDEX index_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (lower((measureid)::text))
sensors_8981013 | CREATE INDEX index_with_include_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981013 | CREATE UNIQUE INDEX sensors_pkey_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (measureid, eventdatetime, measure_data)
sensors_8981014 | CREATE INDEX hash_index_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981014 | CREATE INDEX index_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (lower((measureid)::text))
sensors_8981014 | CREATE INDEX index_with_include_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981014 | CREATE UNIQUE INDEX sensors_pkey_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (measureid, eventdatetime, measure_data)
(8 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
table_with_index_rep_identity_8981019 | CREATE UNIQUE INDEX uqx_8981019 ON citus_split_test_schema.table_with_index_rep_identity_8981019 USING btree (key)
table_with_index_rep_identity_8981020 | CREATE UNIQUE INDEX uqx_8981020 ON citus_split_test_schema.table_with_index_rep_identity_8981020 USING btree (key)
(2 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
stats_on_sensors
stats_on_sensors_8981013
stats_on_sensors_8981014
(3 rows)
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_8981007 | fkey_table_to_dist_8981007 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981009(measureid)
sensors_8981008 | fkey_table_to_dist_8981008 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981010(measureid)
sensors_8981015 | fkey_table_to_dist_8981015 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981018(measureid)
(3 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
sensors_8981007 | CREATE INDEX hash_index_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981007 | CREATE INDEX index_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (lower((measureid)::text))
sensors_8981007 | CREATE INDEX index_with_include_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981007 | CREATE UNIQUE INDEX sensors_pkey_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (measureid, eventdatetime, measure_data)
sensors_8981008 | CREATE INDEX hash_index_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981008 | CREATE INDEX index_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (lower((measureid)::text))
sensors_8981008 | CREATE INDEX index_with_include_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981008 | CREATE UNIQUE INDEX sensors_pkey_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (measureid, eventdatetime, measure_data)
sensors_8981015 | CREATE INDEX hash_index_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING hash (((measure_data -> 'IsFailed'::text)))
sensors_8981015 | CREATE INDEX index_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (lower((measureid)::text))
sensors_8981015 | CREATE INDEX index_with_include_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
sensors_8981015 | CREATE UNIQUE INDEX sensors_pkey_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (measureid, eventdatetime, measure_data)
(12 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
table_with_index_rep_identity_8981011 | CREATE UNIQUE INDEX uqx_8981011 ON citus_split_test_schema.table_with_index_rep_identity_8981011 USING btree (key)
table_with_index_rep_identity_8981012 | CREATE UNIQUE INDEX uqx_8981012 ON citus_split_test_schema.table_with_index_rep_identity_8981012 USING btree (key)
table_with_index_rep_identity_8981021 | CREATE UNIQUE INDEX uqx_8981021 ON citus_split_test_schema.table_with_index_rep_identity_8981021 USING btree (key)
(3 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
stats_on_sensors
stats_on_sensors_8981007
stats_on_sensors_8981008
stats_on_sensors_8981015
(4 rows)
-- END : Display current state
-- BEGIN: Should be able to change/drop constraints
\c - postgres - :master_port
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed;
ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200;
DROP STATISTICS stats_on_sensors;
DROP INDEX index_on_sensors_renamed;
ALTER TABLE sensors DROP CONSTRAINT fkey_table_to_dist;
-- END: Should be able to change/drop constraints
-- BEGIN: Split second time on another schema
SET search_path TO public;
SET citus.next_shard_id TO 8981031;
SELECT pg_catalog.citus_split_shard_by_split_points(
8981007,
ARRAY['-2100000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981007" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
SET search_path TO "citus_split_test_schema";
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
ORDER BY logicalrelid, shardminvalue::BIGINT;
shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport
---------------------------------------------------------------------
8981031 | sensors | -2147483648 | -2100000000 | localhost | 57637
8981032 | sensors | -2099999999 | -1073741824 | localhost | 57638
8981008 | sensors | -1073741823 | -1 | localhost | 57638
8981013 | sensors | 0 | 536870911 | localhost | 57637
8981014 | sensors | 536870912 | 1610612735 | localhost | 57637
8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638
8981033 | colocated_dist_table | -2147483648 | -2100000000 | localhost | 57637
8981034 | colocated_dist_table | -2099999999 | -1073741824 | localhost | 57638
8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638
8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637
8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637
8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638
8981035 | table_with_index_rep_identity | -2147483648 | -2100000000 | localhost | 57637
8981036 | table_with_index_rep_identity | -2099999999 | -1073741824 | localhost | 57638
8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638
8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637
8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637
8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638
(18 rows)
-- END: Split second time on another schema
-- BEGIN: Validate Data Count
SELECT COUNT(*) FROM sensors;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT COUNT(*) FROM reference_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT COUNT(*) FROM colocated_dist_table;
count
---------------------------------------------------------------------
1001
(1 row)
-- END: Validate Data Count
--BEGIN : Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_split_test_schema" CASCADE;
NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table citus_split_test_schema.sensors
drop cascades to table citus_split_test_schema.reference_table
drop cascades to table citus_split_test_schema.colocated_dist_table
drop cascades to table citus_split_test_schema.table_with_index_rep_identity
--END : Cleanup

View File

@ -22,13 +22,13 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT * FROM citus_shards;
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
---------------------------------------------------------------------
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8888 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8887 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9995 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9992 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 57637 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9998 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9997 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8888 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8887 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9995 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9992 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 57637 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9998 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9997 | 0
(7 rows)
SELECT * FROM pg_dist_shard;
@ -62,7 +62,7 @@ SELECT citus_split_shard_by_split_points(
ARRAY['0'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_split_replicationslot_for_shard_1" does not exist
WARNING: replication slot "citus_shard_split_template_slot_1" does not exist
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
@ -88,7 +88,7 @@ SELECT * FROM show_catalog;
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
17311 | 16384 | citus_shard_split_subscription_10 | 17310 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10}
20669 | 16384 | citus_shard_split_subscription_10 | 20668 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10}
(1 row)
SELECT slot_name FROM pg_replication_slots;
@ -115,20 +115,20 @@ SELECT * FROM show_catalog;
SELECT * FROM pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
---------------------------------------------------------------------
17371 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f
17374 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f
20728 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f
20731 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f
(2 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
17378 | 16384 | citus_shard_split_subscription_10 | 17377 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10}
20735 | 16384 | citus_shard_split_subscription_10 | 20734 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10}
(1 row)
SELECT slot_name FROM pg_replication_slots;
slot_name
slot_name
---------------------------------------------------------------------
citus_split_replicationslot_for_shard_1
citus_shard_split_template_slot_1
citus_split_16_10
citus_split_18_10
(3 rows)

View File

@ -78,8 +78,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;

View File

@ -71,7 +71,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
1
(1 row)
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;

View File

@ -19,7 +19,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
1
(1 row)
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' a
BEGIN;
CREATE SUBSCRIPTION local_subscription

View File

@ -18,8 +18,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple
2
(1 row)
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'

View File

@ -8,16 +8,17 @@ test: tablespace
test: foreign_key_to_reference_table
# Split tests go here.
#test: citus_sameer
test: split_shard_replication_setup
test: split_shard_replication_setup_remote_local
test: split_shard_replication_setup_local
test: split_shard_replication_colocated_setup
test: worker_split_copy_test
test: worker_split_binary_copy_test
test: worker_split_text_copy_test
test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points
test: citus_split_shard_by_split_points_failure
#test: split_shard_replication_setup
#test: split_shard_replication_setup_remote_local
#test: split_shard_replication_setup_local
#test: split_shard_replication_colocated_setup
#test: worker_split_copy_test
#test: worker_split_binary_copy_test
#test: worker_split_text_copy_test
#test: citus_split_shard_by_split_points_negative
#test: citus_split_shard_by_split_points
#test: citus_split_shard_by_split_points_failure
# Name citus_split_shard_by_split_points_columnar_partitioned was too long and being truncated.
# use citus_split_shard_columnar_partitioned instead.
test: citus_split_shard_columnar_partitioned
#test: citus_split_shard_columnar_partitioned
test: citus_non_blocking_split_shards

View File

@ -0,0 +1,240 @@
/*
Citus Shard Split Test.The test is model similar to 'shard_move_constraints'.
Here is a high level overview of test plan:
1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table.
2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors.
3. Create Foreign key constraints between the two co-located distributed tables.
4. Load data into the three tables.
5. Move one of the shards for 'sensors' to test ShardMove -> Split.
6. Trigger Split on both shards of 'sensors'. This will also split co-located tables.
7. Move one of the split shard to test Split -> ShardMove.
8. Split an already split shard second time on a different schema.
*/
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role;
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
CREATE TABLE sensors(
measureid integer,
eventdatetime date,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data));
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
-- BEGIN: Create co-located distributed and reference tables.
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
CREATE TABLE table_with_index_rep_identity(key int NOT NULL);
CREATE UNIQUE INDEX uqx ON table_with_index_rep_identity(key);
ALTER TABLE table_with_index_rep_identity REPLICA IDENTITY USING INDEX uqx;
CLUSTER table_with_index_rep_identity USING uqx;
SELECT create_distributed_table('table_with_index_rep_identity', 'key', colocate_with:='sensors');
-- END: Create co-located distributed and reference tables.
-- BEGIN : Create Foreign key constraints.
ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
-- END : Create Foreign key constraints.
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
SELECT COUNT(*) FROM sensors;
SELECT COUNT(*) FROM reference_table;
SELECT COUNT(*) FROM colocated_dist_table;
-- END: Load data into tables.
-- BEGIN : Display current state.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
ORDER BY logicalrelid, shardminvalue::BIGINT;
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
-- END : Display current state
-- BEGIN : Move one shard before we split it.
\c - postgres - :master_port
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981007;
SET citus.defer_drop_after_shard_move TO OFF;
SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
-- END : Move one shard before we split it.
-- BEGIN : Set node id variables
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- END : Set node id variables
-- BEGIN : Split two shards : One with move and One without move.
-- Perform 2 way split
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-1073741824'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
-- Perform 3 way split
SELECT pg_catalog.citus_split_shard_by_split_points(
8981001,
ARRAY['536870911', '1610612735'],
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node],
'force_logical');
-- END : Split two shards : One with move and One without move.
-- BEGIN : Move a shard post split.
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
-- END : Move a shard post split.
-- BEGIN : Display current state.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
ORDER BY logicalrelid, shardminvalue::BIGINT;
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like 'sensors_%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema')
)
ORDER BY stxname ASC;
-- END : Display current state
-- BEGIN: Should be able to change/drop constraints
\c - postgres - :master_port
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed;
ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200;
DROP STATISTICS stats_on_sensors;
DROP INDEX index_on_sensors_renamed;
ALTER TABLE sensors DROP CONSTRAINT fkey_table_to_dist;
-- END: Should be able to change/drop constraints
-- BEGIN: Split second time on another schema
SET search_path TO public;
SET citus.next_shard_id TO 8981031;
SELECT pg_catalog.citus_split_shard_by_split_points(
8981007,
ARRAY['-2100000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
SET search_path TO "citus_split_test_schema";
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
ORDER BY logicalrelid, shardminvalue::BIGINT;
-- END: Split second time on another schema
-- BEGIN: Validate Data Count
SELECT COUNT(*) FROM sensors;
SELECT COUNT(*) FROM reference_table;
SELECT COUNT(*) FROM colocated_dist_table;
-- END: Validate Data Count
--BEGIN : Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_split_test_schema" CASCADE;
--END : Cleanup

View File

@ -67,5 +67,5 @@ SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
SELECT * FROM pg_publication;
SELECT * FROM pg_subscription;
SELECT slot_name FROM pg_replication_slots;
SELECT * FROM pg_replication_slots;
SELECT * FROM table_to_split_100;

View File

@ -76,9 +76,9 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port

View File

@ -71,7 +71,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port

View File

@ -18,7 +18,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]);
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' a
BEGIN;

View File

@ -16,8 +16,8 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1