1. Added an isolation test

2. Added DisableAndDropShardSplitSubscription
3. Added creation of replica identities and validated through isolation test
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-30 13:36:10 +05:30
parent aef758a4a3
commit adf73387f4
25 changed files with 1151 additions and 72 deletions

View File

@ -69,6 +69,9 @@ static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalLis
List *workersForPlacementList);
static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList);
static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
bool includeReplicaIdentity);
static void CreateReplicaIdentities(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
@ -655,7 +658,8 @@ CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode)
*/
static void
CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
List *workersForPlacementList, bool
includeReplicaIdentity)
{
List *shardIntervalList = NIL;
List *ddlTaskExecList = NIL;
@ -678,7 +682,7 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *ddlCommandList = GetPostLoadTableCreationCommands(
shardInterval->relationId,
true /* includeIndexes */,
true /* includeReplicaIdentity */);
includeReplicaIdentity);
ddlCommandList = WorkerApplyShardDDLCommandList(
ddlCommandList,
shardInterval->shardId);
@ -725,7 +729,8 @@ CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkfl
/* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
workersForPlacementList,
true /* includeReplicaIdentity*/);
}
@ -1298,6 +1303,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
sourceShardToCopyNode,
workersForPlacementList);
CreateReplicaIdentities(shardGroupSplitIntervalListList, workersForPlacementList);
/* 3) Create Publications. */
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
@ -1350,6 +1358,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
superUser,
databaseName);
/* Used for testing */
ConflictOnlyWithIsolationTesting();
/* 9) Wait for subscriptions to be ready */
WaitForShardSplitRelationSubscriptionsBecomeReady(
shardSplitSubscribersMetadataList);
@ -1361,7 +1372,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* 11) Create Auxilary structures */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
workersForPlacementList,
false /* includeReplicaIdentity*/);
/* 12) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection);
@ -1702,6 +1714,12 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
splitChildShardIntervalList, shardGroupSplitIntervalListList)
{
int64 sourceShardId = sourceShardIntervalToCopy->shardId;
Oid relationId = sourceShardIntervalToCopy->relationId;
Var *partitionColumn = DistPartitionKey(relationId);
bool missingOK = false;
char *partitionColumnName =
get_attname(relationId, partitionColumn->varattno, missingOK);
ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL;
@ -1722,8 +1740,9 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
splitChildShardInterval->maxValue));
appendStringInfo(splitChildrenRows,
"ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info",
"ROW(%lu, %s, %lu, %s, %s, %u)::citus.split_shard_info",
sourceShardId,
quote_literal_cstr(partitionColumnName),
splitChildShardInterval->shardId,
quote_literal_cstr(minValueString->data),
quote_literal_cstr(maxValueString->data),
@ -1868,3 +1887,50 @@ TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval)
dropShardQuery->data,
NULL /* pgResult */);
}
/*todo(saawasek): Add comments */
static void
CreateReplicaIdentities(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/*
* Create Replica Identities for actual child shards.
*/
List *shardIntervalList = NIL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
/*
* Iterate on split shard interval list for given shard and create tasks
* for every single split shard in a shard group.
*/
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
List *shardList = NIL;
shardList = lappend(shardList, shardInterval);
CreateReplicaIdentity(shardList, workerPlacementNode->workerName,
workerPlacementNode->workerPort);
}
}
/*todo: remove the global variable dummy map*/
HASH_SEQ_STATUS status;
hash_seq_init(&status, DummyShardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
false /* missingOk */);
List *dummyShardIntervalList = entry->shardSplitInfoList;
CreateReplicaIdentity(dummyShardIntervalList, shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort);
}
}

View File

@ -12,6 +12,7 @@
#include "miscadmin.h"
#include "postmaster/postmaster.h"
#include "common/hashfn.h"
#include "distributed/distribution_column.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_utils.h"
#include "distributed/shardsplit_shared_memory.h"
@ -34,12 +35,14 @@ static HTAB *ShardInfoHashMap = NULL;
/* Function declarations */
static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
uint64 *sourceShardId,
char **partitionColumnName,
uint64 *childShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId);
static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
char *partitionColumnName,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
@ -111,16 +114,19 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
while (array_iterate(shardInfo_iterator, &shardInfoDatum, &isnull))
{
uint64 sourceShardId = 0;
char *partitionColumnName = NULL;
uint64 childShardId = 0;
int32 minValue = 0;
int32 maxValue = 0;
int32 nodeId = 0;
ParseShardSplitInfoFromDatum(shardInfoDatum, &sourceShardId, &childShardId,
ParseShardSplitInfoFromDatum(shardInfoDatum, &sourceShardId,
&partitionColumnName, &childShardId,
&minValue, &maxValue, &nodeId);
ShardSplitInfo *shardSplitInfo = CreateShardSplitInfo(
sourceShardId,
partitionColumnName,
childShardId,
minValue,
maxValue,
@ -177,6 +183,7 @@ SetupHashMapForShardInfo()
* with appropriate OIs' for source and destination relation.
*
* sourceShardIdToSplit - Existing shardId which has a valid entry in cache and catalogue
* partitionColumnName - Name of column to use for partitioning
* desSplitChildShardId - New split child shard which doesn't have an entry in metacache yet.
* minValue - Minimum hash value for desSplitChildShardId
* maxValue - Maximum hash value for desSplitChildShardId
@ -185,6 +192,7 @@ SetupHashMapForShardInfo()
*/
ShardSplitInfo *
CreateShardSplitInfo(uint64 sourceShardIdToSplit,
char *partitionColumnName,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
@ -204,23 +212,6 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
"worker nodes hosting source shards.", sourceShardIdToSplit));
}
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
shardIntervalToSplit->relationId);
if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED))
{
Relation distributedRel = RelationIdGetRelation(cachedTableEntry->relationId);
ereport(ERROR, (errmsg(
"Citus does only support Hash distributed tables to be split."),
errdetail("Table '%s' is not Hash distributed",
RelationGetRelationName(distributedRel))
));
RelationClose(distributedRel);
}
Assert(shardIntervalToSplit->minValueExists);
Assert(shardIntervalToSplit->maxValueExists);
/* Oid of distributed table */
Oid citusTableOid = shardIntervalToSplit->relationId;
Oid sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid,
@ -244,7 +235,9 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
}
/* determine the partition column in the tuple descriptor */
Var *partitionColumn = cachedTableEntry->partitionColumn;
Var *partitionColumn = BuildDistributionKeyFromColumnName(sourceShardToSplitOid,
partitionColumnName,
AccessShareLock);
if (partitionColumn == NULL)
{
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
@ -370,6 +363,7 @@ NodeShardMappingHashCompare(const void *left, const void *right, Size keysize)
static void
ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
uint64 *sourceShardId,
char **partitionColumnName,
uint64 *childShardId,
int32 *minValue,
int32 *maxValue,
@ -385,6 +379,15 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
}
*sourceShardId = DatumGetUInt64(sourceShardIdDatum);
Datum partitionColumnDatum = GetAttributeByName(dataTuple, "distribution_column",
&isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"distribution_column for split_shard_info can't be null")));
}
*partitionColumnName = TextDatumGetCString(partitionColumnDatum);
Datum childShardIdDatum = GetAttributeByName(dataTuple, "child_shard_id", &isnull);
if (isnull)
{

View File

@ -88,8 +88,6 @@ static void CreateForeignConstraintsToReferenceTable(List *shardList,
MultiConnection *targetConnection);
static List * PrepareReplicationSubscriptionList(List *shardList);
static Bitmapset * TableOwnerIds(List *shardList);
static void CreateReplicaIdentity(List *shardList, char *nodeName, int32
nodePort);
static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId);
static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId,
uint64 shardId);
@ -115,7 +113,6 @@ static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName,
int targetNodePort);
static void CreateColocatedForeignKeys(List *shardList, char *targetNodeName,
int targetNodePort);
static void ConflictOnlyWithIsolationTesting(void);
static void DropShardMovePublications(MultiConnection *connection,
Bitmapset *tableOwnerIds);
static void DropShardMoveSubscriptions(MultiConnection *connection,
@ -456,7 +453,7 @@ TableOwnerIds(List *shardList)
* CreateReplicaIdentity gets a shardList and creates all the replica identities
* on the shards in the given node.
*/
static void
void
CreateReplicaIdentity(List *shardList, char *nodeName, int32 nodePort)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
@ -1048,7 +1045,7 @@ CreateForeignConstraintsToReferenceTable(List *shardList,
* Note that since the cost of calling this function is pretty low, we prefer
* to use it in non-assert builds as well not to diverge in the behaviour.
*/
static void
extern void
ConflictOnlyWithIsolationTesting()
{
LOCKTAG tag;

View File

@ -12,6 +12,7 @@
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/listutils.h"
#include "replication/logical.h"
#include "utils/typcache.h"
/*
* Dynamically-loaded modules are required to include this macro call to check for
@ -208,24 +209,22 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
int partitionColumnIndex,
Oid distributedTableOid)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg(
"Expected valid Citus Cache entry to be present. But found null"));
}
TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation);
Form_pg_attribute partitionColumn = TupleDescAttr(relationTupleDes,
partitionColumnIndex);
bool isNull = false;
Datum partitionColumnValue = heap_getattr(tuple,
partitionColumnIndex + 1,
relationTupleDes,
&isNull);
FmgrInfo *hashFunction = cacheEntry->hashFunction;
TypeCacheEntry *typeEntry = lookup_type_cache(partitionColumn->atttypid,
TYPECACHE_HASH_PROC_FINFO);
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue);
Datum hashedValueDatum = FunctionCall1(&(typeEntry->hash_proc_finfo),
partitionColumnValue);
return DatumGetInt32(hashedValueDatum);
}

View File

@ -603,7 +603,7 @@ DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
char *subscriptionName = NULL;
foreach_ptr(subscriptionName, subscriptionNameList)
{
DropShardSubscription(cleanupConnection, subscriptionName);
DisableAndDropShardSplitSubscription(cleanupConnection, subscriptionName);
}
}
@ -709,7 +709,8 @@ DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList)
uint32 tableOwnerId = subscriberMetadata->tableOwnerId;
MultiConnection *targetNodeConnection = subscriberMetadata->targetNodeConnection;
DropShardSubscription(targetNodeConnection, ShardSubscriptionName(tableOwnerId,
DisableAndDropShardSplitSubscription(targetNodeConnection, ShardSubscriptionName(
tableOwnerId,
SHARD_SPLIT_SUBSCRIPTION_PREFIX));
DropShardUser(targetNodeConnection, ShardSubscriptionRole(tableOwnerId,
@ -718,6 +719,30 @@ DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList)
}
/*todo(saawasek): add comments */
void
DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscriptionName)
{
StringInfo alterSubscriptionSlotCommand = makeStringInfo();
StringInfo alterSubscriptionDisableCommand = makeStringInfo();
appendStringInfo(alterSubscriptionDisableCommand,
"ALTER SUBSCRIPTION %s DISABLE",
quote_identifier(subscriptionName));
ExecuteCriticalRemoteCommand(connection,
alterSubscriptionDisableCommand->data);
appendStringInfo(alterSubscriptionSlotCommand,
"ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
quote_identifier(subscriptionName));
ExecuteCriticalRemoteCommand(connection, alterSubscriptionSlotCommand->data);
ExecuteCriticalRemoteCommand(connection, psprintf(
"DROP SUBSCRIPTION %s",
quote_identifier(subscriptionName)));
}
/*
* CloseShardSplitSubscriberConnections closes connection of subscriber nodes.
* 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method

View File

@ -1,5 +1,6 @@
CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint,
distribution_column text,
child_shard_id bigint,
shard_min_value text,
shard_max_value text,

View File

@ -1,5 +1,6 @@
CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint,
distribution_column text,
child_shard_id bigint,
shard_min_value text,
shard_max_value text,

View File

@ -26,6 +26,9 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort);
extern void ConflictOnlyWithIsolationTesting(void);
extern void CreateReplicaIdentity(List *shardList, char *nodeName, int32
nodePort);
extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection);
extern List * GetQueryResultStringList(MultiConnection *connection, char *query);

View File

@ -96,6 +96,8 @@ extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot(
ShardInterval *shardIntervalToSplit,
MultiConnection *
sourceConnection);
extern void DisableAndDropShardSplitSubscription(MultiConnection *connection,
char *subscriptionName);
/* Wrapper functions which wait for a subscriber to be ready and catchup */
extern void WaitForShardSplitRelationSubscriptionsBecomeReady(

View File

@ -7,3 +7,4 @@ test: isolation_cluster_management
test: isolation_logical_replication_single_shard_commands
test: isolation_logical_replication_multi_shard_commands
test: isolation_non_blocking_shard_split

View File

@ -11,9 +11,9 @@ Here is a high level overview of test plan:
8. Split an already split shard second time on a different schema.
*/
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role;
SET ROLE test_split_role;
CREATE ROLE test_shard_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_shard_split_role;
SET ROLE test_shard_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000;
@ -196,7 +196,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node
-- END : Display current state
-- BEGIN : Move one shard before we split it.
\c - postgres - :master_port
SET ROLE test_split_role;
SET ROLE test_shard_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981007;
SET citus.defer_drop_after_shard_move TO OFF;
@ -380,7 +380,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node
-- END : Display current state
-- BEGIN: Should be able to change/drop constraints
\c - postgres - :master_port
SET ROLE test_split_role;
SET ROLE test_shard_split_role;
SET search_path TO "citus_split_test_schema";
ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed;
ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200;

View File

@ -0,0 +1,124 @@
/*
Citus Shard Split Test.The test is model similar to 'shard_move_constraints'.
Here is a high level overview of test plan:
1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table.
2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors.
3. Create Foreign key constraints between the two co-located distributed tables.
4. Load data into the three tables.
5. Move one of the shards for 'sensors' to test ShardMove -> Split.
6. Trigger Split on both shards of 'sensors'. This will also split co-located tables.
7. Move one of the split shard to test Split -> ShardMove.
8. Split an already split shard second time on a different schema.
*/
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role;
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
CREATE TABLE sensors(
measureid integer,
eventdatetime date,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data));
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
-- BEGIN : Move one shard before we split it.
\c - postgres - :master_port
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981007;
SET citus.defer_drop_after_shard_move TO OFF;
SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- END : Move one shard before we split it.
-- BEGIN : Set node id variables
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- END : Set node id variables
-- BEGIN : Split two shards : One with move and One without move.
-- Perform 2 way split
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
---------------------------------------------------------------------
sensors | 8981000 | citus_split_test_schema.sensors_8981000 | distributed | 1390009 | localhost | 57638 | 40960
sensors | 8981001 | citus_split_test_schema.sensors_8981001 | distributed | 1390009 | localhost | 57638 | 40960
(2 rows)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-1073741824'],
ARRAY[:worker_2_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981000" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
---------------------------------------------------------------------
sensors | 8981001 | citus_split_test_schema.sensors_8981001 | distributed | 1390009 | localhost | 57638 | 40960
sensors | 8981007 | citus_split_test_schema.sensors_8981007 | distributed | 1390009 | localhost | 57638 | 24576
sensors | 8981008 | citus_split_test_schema.sensors_8981008 | distributed | 1390009 | localhost | 57638 | 24576
(3 rows)
\c - - - :worker_2_port
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_template_slot_8981000
citus_shard_split_18_20648
(2 rows)
\c - - - :master_port
SELECT pg_catalog.citus_split_shard_by_split_points(
8981001,
ARRAY['536870911', '1610612735'],
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981001" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
---------------------------------------------------------------------
citus_split_test_schema.sensors | 102042 | citus_split_test_schema.sensors_102042 | distributed | 1390009 | localhost | 57637 | 8192
citus_split_test_schema.sensors | 102043 | citus_split_test_schema.sensors_102043 | distributed | 1390009 | localhost | 57637 | 16384
citus_split_test_schema.sensors | 102044 | citus_split_test_schema.sensors_102044 | distributed | 1390009 | localhost | 57638 | 16384
citus_split_test_schema.sensors | 8981007 | citus_split_test_schema.sensors_8981007 | distributed | 1390009 | localhost | 57638 | 24576
citus_split_test_schema.sensors | 8981008 | citus_split_test_schema.sensors_8981008 | distributed | 1390009 | localhost | 57638 | 24576
(5 rows)
\c - - - :worker_2_port
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_template_slot_8981001
citus_shard_split_16_20648
citus_shard_split_18_20648
(3 rows)

View File

@ -0,0 +1,647 @@
unused step name: s2-insert-2
unused step name: s2-select
Parsed test spec with 3 sessions
starting permutation: s1-load-cache s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-load-cache:
-- Indirect way to load cache.
TRUNCATE to_split_table;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 0
(1 row)
id|value
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s1-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 1
(2 rows)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
starting permutation: s1-load-cache s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-update s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-load-cache:
-- Indirect way to load cache.
TRUNCATE to_split_table;
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-update:
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s1-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 1
(2 rows)
id|value
---------------------------------------------------------------------
123456789| 111
(1 row)
starting permutation: s1-load-cache s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-delete s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-load-cache:
-- Indirect way to load cache.
TRUNCATE to_split_table;
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-delete:
DELETE FROM to_split_table WHERE id = 123456789;
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s1-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 0
(2 rows)
id|value
---------------------------------------------------------------------
(0 rows)
starting permutation: s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 0
(1 row)
id|value
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s1-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 1
(2 rows)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
starting permutation: s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-update s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-update:
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s1-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 1
(2 rows)
id|value
---------------------------------------------------------------------
123456789| 111
(1 row)
starting permutation: s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-delete s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-delete:
DELETE FROM to_split_table WHERE id = 123456789;
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s1-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 0
(2 rows)
id|value
---------------------------------------------------------------------
(0 rows)

View File

@ -65,10 +65,10 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count

View File

@ -63,8 +63,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
count
---------------------------------------------------------------------

View File

@ -11,8 +11,8 @@ SET client_min_messages TO ERROR;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]);
count
---------------------------------------------------------------------

View File

@ -9,8 +9,8 @@ SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count

View File

@ -0,0 +1,131 @@
// we use 15 as the partition key value through out the test
// so setting the corresponding shard here is useful
setup
{
SET citus.shard_count to 1;
SET citus.shard_replication_factor to 1;
SELECT setval('pg_dist_shardid_seq', 1500000);
CREATE TABLE to_split_table (id int PRIMARY KEY, value int);
SELECT create_distributed_table('to_split_table', 'id');
}
teardown
{
DROP TABLE to_split_table;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
// cache all placements
step "s1-load-cache"
{
-- Indirect way to load cache.
TRUNCATE to_split_table;
}
step "s1-non-blocking-shard-split"
{
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
}
step "s1-end"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-insert"
{
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
}
step "s2-insert-2"
{
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (1234567819, 1);
}
step "s2-update"
{
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
}
step "s2-delete"
{
DELETE FROM to_split_table WHERE id = 123456789;
}
step "s2-select"
{
SELECT count(*) FROM to_split_table WHERE id = 123456789;
}
step "s2-end"
{
COMMIT;
}
step "s2-print-cluster"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
}
session "s3"
// this advisory lock with (almost) random values are only used
// for testing purposes. For details, check Citus' logical replication
// source code
step "s3-acquire-advisory-lock"
{
SELECT pg_advisory_lock(44000, 55152);
}
step "s3-release-advisory-lock"
{
SELECT pg_advisory_unlock(44000, 55152);
}
##// nonblocking tests lie below ###
// move placement first
// the following tests show the non-blocking modifications while shard is being moved
// in fact, the shard move blocks the writes for a very short duration of time
// by using an advisory and allowing the other commands continue to run, we prevent
// the modifications to block on that blocking duration
//permutation "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-insert" "s3-release-advisory-lock" "s1-end" "s1-select" "s1-get-shard-distribution"
permutation "s1-load-cache" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"

View File

@ -21,3 +21,4 @@ test: citus_split_shard_by_split_points_failure
# use citus_split_shard_columnar_partitioned instead.
test: citus_split_shard_columnar_partitioned
test: citus_non_blocking_split_shards
# test: citus_sameer

View File

@ -13,9 +13,9 @@ Here is a high level overview of test plan:
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role;
SET ROLE test_split_role;
CREATE ROLE test_shard_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_shard_split_role;
SET ROLE test_shard_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981000;
@ -119,7 +119,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node
-- BEGIN : Move one shard before we split it.
\c - postgres - :master_port
SET ROLE test_split_role;
SET ROLE test_shard_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981007;
SET citus.defer_drop_after_shard_move TO OFF;
@ -200,7 +200,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node
-- BEGIN: Should be able to change/drop constraints
\c - postgres - :master_port
SET ROLE test_split_role;
SET ROLE test_shard_split_role;
SET search_path TO "citus_split_test_schema";
ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed;
ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200;

View File

@ -0,0 +1,78 @@
/*
Citus Shard Split Test.The test is model similar to 'shard_move_constraints'.
Here is a high level overview of test plan:
1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table.
2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors.
3. Create Foreign key constraints between the two co-located distributed tables.
4. Load data into the three tables.
5. Move one of the shards for 'sensors' to test ShardMove -> Split.
6. Trigger Split on both shards of 'sensors'. This will also split co-located tables.
7. Move one of the split shard to test Split -> ShardMove.
8. Split an already split shard second time on a different schema.
*/
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN;
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role;
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
CREATE TABLE sensors(
measureid integer,
eventdatetime date,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data));
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
-- BEGIN : Move one shard before we split it.
\c - postgres - :master_port
SET ROLE test_split_role;
SET search_path TO "citus_split_test_schema";
SET citus.next_shard_id TO 8981007;
SET citus.defer_drop_after_shard_move TO OFF;
SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
-- END : Move one shard before we split it.
-- BEGIN : Set node id variables
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- END : Set node id variables
-- BEGIN : Split two shards : One with move and One without move.
-- Perform 2 way split
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-1073741824'],
ARRAY[:worker_2_node, :worker_2_node],
'force_logical');
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
\c - - - :worker_2_port
SELECT slot_name FROM pg_replication_slots;
\c - - - :master_port
SELECT pg_catalog.citus_split_shard_by_split_points(
8981001,
ARRAY['536870911', '1610612735'],
ARRAY[:worker_2_node, :worker_2_node, :worker_2_node],
'force_logical');
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
\c - - - :worker_2_port
SELECT slot_name FROM pg_replication_slots;

View File

@ -67,10 +67,10 @@ CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset

View File

@ -67,8 +67,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset

View File

@ -14,8 +14,8 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]);
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset

View File

@ -12,8 +12,8 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset