Address review comments.

1) Add partitioning test
2) Refactor some methods
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-08-02 19:09:32 +05:30
parent a430b0bffc
commit d6258fe0db
23 changed files with 1304 additions and 128 deletions

View File

@ -606,6 +606,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
key.port = nodePort;
strlcpy(key.user, userName, NAMEDATALEN);
strlcpy(key.database, database, NAMEDATALEN);
key.replicationConnParam = false;
ConnectionHashEntry *entry =
(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
@ -1299,6 +1300,7 @@ WarmUpConnParamsHash(void)
key.port = workerNode->workerPort;
strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
key.replicationConnParam = false;
FindOrCreateConnParamsEntry(&key);
}
}

View File

@ -425,7 +425,6 @@ SplitShard(SplitMode splitMode,
}
else
{
/*TODO(saawasek): Discussing about existing bug with the assumption of move shard*/
NonBlockingShardSplit(
splitOperation,
shardIntervalToSplit,
@ -851,6 +850,11 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
}
/*
* CreateSplitCopyTask creates a task for copying data.
* In the case of Non-blocking split, snapshotted copy task is created with given 'snapshotName'.
* 'snapshotName' is NULL for Blocking split.
*/
static Task *
CreateSplitCopyTask(StringInfo splitCopyUdfCommand, char *snapshotName, int taskId, uint64
jobId)
@ -1403,7 +1407,12 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* 16) Drop Publications */
DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* 17) TODO(saawasek): Try dropping replication slots explicitly */
/* 17) Drop replication slots
* Drop template and subscriber replication slots
*/
DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList);
/*
* 18) Drop old shards and delete related metadata. Have to do that before
@ -1416,6 +1425,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
CreatePartitioningHierarchy(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* 20) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
@ -1424,11 +1436,11 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
/* 21) Drop dummy shards.
* TODO(saawasek):Refactor and pass hashmap.Currently map is global variable */
/*
* 21) Drop dummy shards.
*/
DropDummyShards(mapOfDummyShardToPlacement);
/* 22) Close source connection */
CloseConnection(sourceConnection);
@ -1443,7 +1455,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
/*TODO(saawasek): Add checks to open new connection if sourceConnection is not valid anymore.*/
DropAllShardSplitLeftOvers(sourceShardToCopyNode,
shardSplitHashMapForPublication);
@ -1621,8 +1632,8 @@ CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
* Try to drop leftover template replication slot if any from previous operation
* and create new one.
*/
char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval,
sourceConnection);
char *snapShotName = CreateTemplateReplicationSlot(shardInterval,
sourceConnection);
*templateSlotConnection = sourceConnection;
return snapShotName;
@ -1894,7 +1905,9 @@ DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval)
}
/*todo(saawasek): Add comments */
/*
* CreateReplicaIdentities creates replica indentities for split children and dummy shards.
*/
static void
CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement,
List *shardGroupSplitIntervalListList,
@ -1924,7 +1937,7 @@ CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement,
}
}
/*todo: remove the global variable dummy map*/
/* Create Replica Identities for dummy shards */
HASH_SEQ_STATUS status;
hash_seq_init(&status, mapOfDummyShardToPlacement);

View File

@ -305,7 +305,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId;
char *derivedSlotName =
encode_replication_slot(nodeId, tableOwnerId);
EncodeReplicationSlot(nodeId, tableOwnerId);
List *shardSplitInfoList = entry->shardSplitInfoList;
ShardSplitInfo *splitShardInfo = NULL;
@ -448,8 +448,8 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc
char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false);
values[1] = CStringGetTextDatum(tableOwnerName);
char *slotName = encode_replication_slot(entry->key.nodeId,
entry->key.tableOwnerId);
char *slotName = EncodeReplicationSlot(entry->key.nodeId,
entry->key.tableOwnerId);
values[2] = CStringGetTextDatum(slotName);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);

View File

@ -2069,12 +2069,16 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds,
}
/*TODO(saawasek):Refactor this for ShardMove too.*/
/*
* CreateShardSplitSubscription creates the subscriptions used for shard split
* over the given connection. The subscription is created with 'copy_data'
* set to false and with the given replication slot name.
*/
void
CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName, char *databaseName,
char *publicationName, char *slotName,
Oid ownerId)
CreateShardSplitSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName, char *databaseName,
char *publicationName, char *slotName,
Oid ownerId)
{
StringInfo createSubscriptionCommand = makeStringInfo();
StringInfo conninfo = makeStringInfo();
@ -2090,7 +2094,7 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
psprintf(
"CREATE USER %s SUPERUSER IN ROLE %s",
ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX),
GetUserNameFromId(ownerId, false)
quote_identifier(GetUserNameFromId(ownerId, false))
)));
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "

View File

@ -14,10 +14,12 @@
#include "nodes/pg_list.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_split.h"
#include "distributed/shared_library_init.h"
#include "distributed/listutils.h"
#include "distributed/shardsplit_logical_replication.h"
#include "distributed/resource_lock.h"
@ -79,6 +81,11 @@ CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList,
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitChildShardIntervalList, shardGroupSplitIntervalListList)
{
if (PartitionedTable(sourceShardIntervalToCopy->relationId))
{
continue;
}
ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
@ -232,6 +239,11 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
}
/*
* CreateTargetNodeConnectionsForShardSplit creates connections on target nodes.
* These connections are used for subscription managment. They are closed
* at the end of non-blocking split workflow.
*/
List *
CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int
connectionFlags, char *user, char *databaseName)
@ -240,7 +252,9 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = NULL;
foreach_ptr(shardSplitSubscriberMetadata, shardSplitSubscribersMetadataList)
{
/*TODO(saawasek):For slot equals not null */
/* slotinfo is expected to be already populated */
Assert(shardSplitSubscriberMetadata->slotInfo != NULL);
uint32 targetWorkerNodeId = shardSplitSubscriberMetadata->slotInfo->targetNodeId;
WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false);
@ -355,14 +369,15 @@ CreateShardSplitSubscriptions(List *targetNodeConnectionList,
{
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid ownerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSubscription(targetConnection,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName,
ShardSplitPublicationName(publicationForNodeId, ownerId),
shardSplitPubSubMetadata->slotInfo->slotName,
ownerId);
CreateShardSplitSubscription(targetConnection,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName,
ShardSplitPublicationName(publicationForNodeId,
ownerId),
shardSplitPubSubMetadata->slotInfo->slotName,
ownerId);
}
}
@ -411,36 +426,26 @@ WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
}
/*
* CreateTemplateReplicationSlot creates a replication slot that acts as a template
* slot for logically replicating split children in the 'catchup' phase of non-blocking split.
* It returns a snapshot name which is used to do snapshotted shard copy in the 'copy' phase
* of nonblocking split workflow.
*/
char *
DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *sourceConnection)
CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *sourceConnection)
{
/*
* To ensure SPLIT is idempotent drop any existing slot from
* previous failed operation.
*/
StringInfo dropReplicationSlotCommand = makeStringInfo();
appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')",
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
/* The Drop command can fail so ignore the response / result and proceed anyways */
PGresult *result = NULL;
int response = ExecuteOptionalRemoteCommand(sourceConnection,
dropReplicationSlotCommand->data,
&result);
PQclear(result);
ForgetResults(sourceConnection);
StringInfo createReplicationSlotCommand = makeStringInfo();
appendStringInfo(createReplicationSlotCommand,
"CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;",
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
response = ExecuteOptionalRemoteCommand(sourceConnection,
createReplicationSlotCommand->data, &result);
PGresult *result = NULL;
int response = ExecuteOptionalRemoteCommand(sourceConnection,
createReplicationSlotCommand->data,
&result);
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
@ -498,8 +503,9 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot
StringInfo createReplicationSlotCommand = makeStringInfo();
appendStringInfo(createReplicationSlotCommand,
"SELECT * FROM pg_copy_logical_replication_slot ('%s','%s')",
templateSlotName, slotName);
"SELECT * FROM pg_catalog.pg_copy_logical_replication_slot (%s, %s)",
quote_literal_cstr(templateSlotName), quote_literal_cstr(
slotName));
ExecuteCriticalRemoteCommand(sourceNodeConnection,
createReplicationSlotCommand->data);
@ -680,12 +686,9 @@ DropShardSplitPublications(MultiConnection *sourceConnection,
/*
* DropShardSplitSubsriptions drops subscriptions from the subscriber node that
* are used to split shards for the given table owners. Note that, it drops the
* replication slots on the publisher node if it can drop the slots as well
* with the DROP SUBSCRIPTION command. Otherwise, only the subscriptions will
* be deleted with DROP SUBSCRIPTION via the connection. In the latter case,
* replication slots will be dropped separately by calling DropShardSplitReplicationSlots.
* DropShardSplitSubsriptions disables and drops subscriptions from the subscriber node that
* are used to split shards. Note that, it does not drop the replication slots on the publisher node.
* Replication slots will be dropped separately by calling DropShardSplitReplicationSlots.
*/
void
DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList)
@ -706,7 +709,15 @@ DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList)
}
/*todo(saawasek): add comments */
/*
* DisableAndDropShardSplitSubscription disables the subscription, resets the slot name to 'none' and
* then drops subscription on the given connection. It does not drop the replication slot.
* The caller of this method should ensure to cleanup the replication slot.
*
* Directly executing 'DROP SUBSCRIPTION' attempts to drop the replication slot at the source node.
* When the subscription is local, direcly dropping the subscription can lead to a self deadlock.
* To avoid this, we first disable the subscription, reset the slot name and then drop the subscription.
*/
void
DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscriptionName)
{
@ -730,6 +741,21 @@ DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscrip
}
/*
* DropShardSplitReplicationSlots drops replication slots on the source node.
*/
void
DropShardSplitReplicationSlots(MultiConnection *sourceConnection,
List *replicationSlotInfoList)
{
ReplicationSlotInfo *replicationSlotInfo = NULL;
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
{
DropShardReplicationSlot(sourceConnection, replicationSlotInfo->slotName);
}
}
/*
* CloseShardSplitSubscriberConnections closes connection of subscriber nodes.
* 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method

View File

@ -188,14 +188,14 @@ ReleaseSharedMemoryOfShardSplitInfo()
/*
* encode_replication_slot returns an encoded replication slot name
* EncodeReplicationSlot returns an encoded replication slot name
* in the following format.
* Slot Name = citus_split_nodeId_tableOwnerOid
* Max supported length of replication slot name is 64 bytes.
*/
char *
encode_replication_slot(uint32_t nodeId,
uint32_t tableOwnerId)
EncodeReplicationSlot(uint32_t nodeId,
uint32_t tableOwnerId)
{
StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "%s%u_%u", SHARD_SPLIT_REPLICATION_SLOT_PREFIX, nodeId,

View File

@ -43,11 +43,12 @@ extern void DropShardReplicationSlot(MultiConnection *connection,
extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix);
extern char * ShardSubscriptionName(Oid ownerId, char *operationPrefix);
extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName,
char *databaseName,
char *publicationName, char *slotName,
Oid ownerId);
extern void CreateShardSplitSubscription(MultiConnection *connection,
char *sourceNodeName,
int sourceNodePort, char *userName,
char *databaseName,
char *publicationName, char *slotName,
Oid ownerId);
extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds,

View File

@ -92,10 +92,11 @@ extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode,
extern void DropShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList);
extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot(
ShardInterval *shardIntervalToSplit,
MultiConnection *
sourceConnection);
extern void DropShardSplitReplicationSlots(MultiConnection *sourceConnection,
List *replicationSlotInfoList);
extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *
sourceConnection);
extern void DisableAndDropShardSplitSubscription(MultiConnection *connection,
char *subscriptionName);

View File

@ -80,5 +80,5 @@ extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void);
extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName, MemoryContext
cxt);
extern char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId);
extern char * EncodeReplicationSlot(uint32_t nodeId, uint32_t tableOwnerId);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */

View File

@ -0,0 +1,674 @@
CREATE SCHEMA "citus_split_test_schema_partitioned";
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.next_shard_id TO 8970000;
SET citus.next_placement_id TO 8770000;
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
CREATE TABLE sensors(
measureid integer,
eventdatetime date,
measure_data jsonb,
PRIMARY KEY (measureid, eventdatetime, measure_data))
PARTITION BY RANGE(eventdatetime);
-- Create Partitions of table 'sensors'.
CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01');
-- Create index on parent and child partitions.
CREATE INDEX index_on_parent ON sensors(lower(measureid::text));
CREATE INDEX index_on_child ON sensors_2020_01_01(lower(measure_data::text));
ALTER INDEX index_on_parent ALTER COLUMN 1 SET STATISTICS 1000;
ALTER INDEX index_on_child ALTER COLUMN 1 SET STATISTICS 1000;
-- Create statistics on parent and child partitions.
CREATE STATISTICS s1 (dependencies) ON measureid, eventdatetime FROM sensors;
CREATE STATISTICS s2 (dependencies) ON measureid, eventdatetime FROM sensors_2020_01_01;
CLUSTER sensors_2020_01_01 USING index_on_child;
SELECT create_distributed_table('sensors', 'measureid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- create colocated distributed tables
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
SELECT create_distributed_table('colocated_dist_table', 'measureid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
CREATE TABLE colocated_partitioned_table(
measureid integer,
eventdatetime date,
PRIMARY KEY (measureid, eventdatetime))
PARTITION BY RANGE(eventdatetime);
CREATE TABLE colocated_partitioned_table_2020_01_01 PARTITION OF colocated_partitioned_table FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
SELECT create_distributed_table('colocated_partitioned_table', 'measureid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CLUSTER colocated_partitioned_table_2020_01_01 USING colocated_partitioned_table_2020_01_01_pkey;
-- create reference tables
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace ns ON cls.relnamespace = ns.oid
WHERE node.noderole = 'primary' AND ns.nspname = 'citus_split_test_schema_partitioned'
ORDER BY logicalrelid, shardminvalue::BIGINT;
shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport
---------------------------------------------------------------------
8970000 | sensors | -2147483648 | 2147483647 | localhost | 57637
8970001 | sensors_old | -2147483648 | 2147483647 | localhost | 57637
8970002 | sensors_2020_01_01 | -2147483648 | 2147483647 | localhost | 57637
8970003 | sensors_news | -2147483648 | 2147483647 | localhost | 57637
8970004 | colocated_dist_table | -2147483648 | 2147483647 | localhost | 57637
8970005 | colocated_partitioned_table | -2147483648 | 2147483647 | localhost | 57637
8970006 | colocated_partitioned_table_2020_01_01 | -2147483648 | 2147483647 | localhost | 57637
8970007 | reference_table | | | localhost | 57637
8970007 | reference_table | | | localhost | 57638
(9 rows)
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
-- BEGIN: Create constraints for tables.
-- from parent to regular dist
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
-- from parent to parent
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_parent FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table(measureid, eventdatetime);
-- from parent to child
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_child FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid, eventdatetime);
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_ref FOREIGN KEY (measureid) REFERENCES reference_table(measureid);
-- from child to regular dist
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
-- from child to parent
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_parent FOREIGN KEY (measureid,eventdatetime) REFERENCES colocated_partitioned_table(measureid,eventdatetime);
-- from child to child
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_child FOREIGN KEY (measureid,eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid,eventdatetime);
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_ref FOREIGN KEY (measureid) REFERENCES reference_table(measureid);
-- No support for foreign keys, unique constraints, or exclusion constraints in columnar tables.
-- Please see: https://github.com/citusdata/citus/tree/main/src/backend/columnar/README.md
-- END: Create constraints for tables.
-- BEGIN: Load data into tables
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
-- END: Load data into tables
-- BEGIN: Show the current state on workers
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_2020_01_01_8970002 | fkey_from_child_to_child_8970002 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_2020_01_01_8970002 | fkey_from_child_to_dist_8970002 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8970004(measureid)
sensors_2020_01_01_8970002 | fkey_from_child_to_parent_8970002 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8970005(eventdatetime, measureid)
sensors_2020_01_01_8970002 | fkey_from_child_to_ref_8970002 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8970002 | fkey_from_parent_to_child_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_2020_01_01_8970002 | fkey_from_parent_to_dist_8970000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8970004(measureid)
sensors_2020_01_01_8970002 | fkey_from_parent_to_parent_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8970005(eventdatetime, measureid)
sensors_2020_01_01_8970002 | fkey_from_parent_to_ref_8970000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8970002 | sensors_2020_01_01_8970002_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_8970000 | fkey_from_parent_to_child_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_8970000 | fkey_from_parent_to_dist_8970000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8970004(measureid)
sensors_8970000 | fkey_from_parent_to_parent_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8970005(eventdatetime, measureid)
sensors_8970000 | fkey_from_parent_to_ref_8970000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_8970000 | sensors_8970000_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_news_8970003 | fkey_from_parent_to_child_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_news_8970003 | fkey_from_parent_to_dist_8970000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8970004(measureid)
sensors_news_8970003 | fkey_from_parent_to_parent_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8970005(eventdatetime, measureid)
sensors_news_8970003 | fkey_from_parent_to_ref_8970000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_old_8970001 | fkey_from_parent_to_child_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8970006(eventdatetime, measureid)
sensors_old_8970001 | fkey_from_parent_to_dist_8970000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8970004(measureid)
sensors_old_8970001 | fkey_from_parent_to_parent_8970000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8970005(eventdatetime, measureid)
sensors_old_8970001 | fkey_from_parent_to_ref_8970000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
(22 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
colocated_dist_table_8970004 | CREATE UNIQUE INDEX colocated_dist_table_pkey_8970004 ON citus_split_test_schema_partitioned.colocated_dist_table_8970004 USING btree (measureid)
colocated_partitioned_table_2020_01_01_8970006 | CREATE UNIQUE INDEX colocated_partitioned_table_2020_01_01_pkey_8970006 ON citus_split_test_schema_partitioned.colocated_partitioned_table_2020_01_01_8970006 USING btree (measureid, eventdatetime)
colocated_partitioned_table_8970005 | CREATE UNIQUE INDEX colocated_partitioned_table_pkey_8970005 ON ONLY citus_split_test_schema_partitioned.colocated_partitioned_table_8970005 USING btree (measureid, eventdatetime)
reference_table_8970007 | CREATE UNIQUE INDEX reference_table_pkey_8970007 ON citus_split_test_schema_partitioned.reference_table_8970007 USING btree (measureid)
sensors_2020_01_01_8970002 | CREATE INDEX index_on_child_8970002 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8970002 USING btree (lower((measure_data)::text))
sensors_2020_01_01_8970002 | CREATE INDEX sensors_2020_01_01_lower_idx_8970002 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8970002 USING btree (lower((measureid)::text))
sensors_2020_01_01_8970002 | CREATE UNIQUE INDEX sensors_2020_01_01_pkey_8970002 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8970002 USING btree (measureid, eventdatetime, measure_data)
sensors_8970000 | CREATE INDEX index_on_parent_8970000 ON ONLY citus_split_test_schema_partitioned.sensors_8970000 USING btree (lower((measureid)::text))
sensors_8970000 | CREATE UNIQUE INDEX sensors_pkey_8970000 ON ONLY citus_split_test_schema_partitioned.sensors_8970000 USING btree (measureid, eventdatetime, measure_data)
sensors_news_8970003 | CREATE INDEX sensors_news_lower_idx_8970003 ON citus_split_test_schema_partitioned.sensors_news_8970003 USING btree (lower((measureid)::text))
sensors_news_8970003 | CREATE UNIQUE INDEX sensors_news_pkey_8970003 ON citus_split_test_schema_partitioned.sensors_news_8970003 USING btree (measureid, eventdatetime, measure_data)
sensors_old_8970001 | CREATE INDEX sensors_old_lower_idx_8970001 ON citus_split_test_schema_partitioned.sensors_old_8970001 USING btree (lower((measureid)::text))
sensors_old_8970001 | CREATE UNIQUE INDEX sensors_old_pkey_8970001 ON citus_split_test_schema_partitioned.sensors_old_8970001 USING btree (measureid, eventdatetime, measure_data)
(13 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
s1
s1_8970000
s2
s2_8970002
(4 rows)
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
(0 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
reference_table_8970007 | CREATE UNIQUE INDEX reference_table_pkey_8970007 ON citus_split_test_schema_partitioned.reference_table_8970007 USING btree (measureid)
(1 row)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
s1
s2
(2 rows)
-- END: Show the current state on workers
-- BEGIN: Split a shard along its co-located shards
\c - - - :master_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.next_shard_id TO 8999000;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT pg_catalog.citus_split_shard_by_split_points(
8970000,
ARRAY['-2120000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
-- END: Split a shard along its co-located shards
-- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace ns ON cls.relnamespace = ns.oid
WHERE node.noderole = 'primary' AND ns.nspname = 'citus_split_test_schema_partitioned'
ORDER BY logicalrelid, shardminvalue::BIGINT;
shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport
---------------------------------------------------------------------
8999000 | sensors | -2147483648 | -2120000000 | localhost | 57637
8999001 | sensors | -2119999999 | 2147483647 | localhost | 57638
8999002 | sensors_old | -2147483648 | -2120000000 | localhost | 57637
8999003 | sensors_old | -2119999999 | 2147483647 | localhost | 57638
8999004 | sensors_2020_01_01 | -2147483648 | -2120000000 | localhost | 57637
8999005 | sensors_2020_01_01 | -2119999999 | 2147483647 | localhost | 57638
8999006 | sensors_news | -2147483648 | -2120000000 | localhost | 57637
8999007 | sensors_news | -2119999999 | 2147483647 | localhost | 57638
8999008 | colocated_dist_table | -2147483648 | -2120000000 | localhost | 57637
8999009 | colocated_dist_table | -2119999999 | 2147483647 | localhost | 57638
8999010 | colocated_partitioned_table | -2147483648 | -2120000000 | localhost | 57637
8999011 | colocated_partitioned_table | -2119999999 | 2147483647 | localhost | 57638
8999012 | colocated_partitioned_table_2020_01_01 | -2147483648 | -2120000000 | localhost | 57637
8999013 | colocated_partitioned_table_2020_01_01 | -2119999999 | 2147483647 | localhost | 57638
8970007 | reference_table | | | localhost | 57637
8970007 | reference_table | | | localhost | 57638
(16 rows)
SELECT count(*) FROM reference_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT count(*) FROM colocated_partitioned_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT count(*) FROM colocated_dist_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT count(*) FROM sensors;
count
---------------------------------------------------------------------
1001
(1 row)
-- END: Validate Shard Info and Data
-- BEGIN: Show the updated state on workers
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_2020_01_01_8999004 | fkey_from_child_to_child_8999004 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_2020_01_01_8999004 | fkey_from_child_to_dist_8999004 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999008(measureid)
sensors_2020_01_01_8999004 | fkey_from_child_to_parent_8999004 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999010(eventdatetime, measureid)
sensors_2020_01_01_8999004 | fkey_from_child_to_ref_8999004 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999004 | fkey_from_parent_to_child_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_2020_01_01_8999004 | fkey_from_parent_to_dist_8999000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999008(measureid)
sensors_2020_01_01_8999004 | fkey_from_parent_to_parent_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999010(eventdatetime, measureid)
sensors_2020_01_01_8999004 | fkey_from_parent_to_ref_8999000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999004 | sensors_2020_01_01_8999004_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_8999000 | fkey_from_parent_to_child_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_8999000 | fkey_from_parent_to_dist_8999000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999008(measureid)
sensors_8999000 | fkey_from_parent_to_parent_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999010(eventdatetime, measureid)
sensors_8999000 | fkey_from_parent_to_ref_8999000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_8999000 | sensors_8999000_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_news_8999006 | fkey_from_parent_to_child_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_news_8999006 | fkey_from_parent_to_dist_8999000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999008(measureid)
sensors_news_8999006 | fkey_from_parent_to_parent_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999010(eventdatetime, measureid)
sensors_news_8999006 | fkey_from_parent_to_ref_8999000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_old_8999002 | fkey_from_parent_to_child_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999012(eventdatetime, measureid)
sensors_old_8999002 | fkey_from_parent_to_dist_8999000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999008(measureid)
sensors_old_8999002 | fkey_from_parent_to_parent_8999000 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999010(eventdatetime, measureid)
sensors_old_8999002 | fkey_from_parent_to_ref_8999000 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
(22 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
colocated_dist_table_8999008 | CREATE UNIQUE INDEX colocated_dist_table_pkey_8999008 ON citus_split_test_schema_partitioned.colocated_dist_table_8999008 USING btree (measureid)
colocated_partitioned_table_2020_01_01_8999012 | CREATE UNIQUE INDEX colocated_partitioned_table_2020_01_01_pkey_8999012 ON citus_split_test_schema_partitioned.colocated_partitioned_table_2020_01_01_8999012 USING btree (measureid, eventdatetime)
colocated_partitioned_table_8999010 | CREATE UNIQUE INDEX colocated_partitioned_table_pkey_8999010 ON ONLY citus_split_test_schema_partitioned.colocated_partitioned_table_8999010 USING btree (measureid, eventdatetime)
reference_table_8970007 | CREATE UNIQUE INDEX reference_table_pkey_8970007 ON citus_split_test_schema_partitioned.reference_table_8970007 USING btree (measureid)
sensors_2020_01_01_8999004 | CREATE INDEX index_on_child_8999004 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999004 USING btree (lower((measure_data)::text))
sensors_2020_01_01_8999004 | CREATE INDEX sensors_2020_01_01_lower_idx_8999004 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999004 USING btree (lower((measureid)::text))
sensors_2020_01_01_8999004 | CREATE UNIQUE INDEX sensors_2020_01_01_pkey_8999004 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999004 USING btree (measureid, eventdatetime, measure_data)
sensors_8999000 | CREATE INDEX index_on_parent_8999000 ON ONLY citus_split_test_schema_partitioned.sensors_8999000 USING btree (lower((measureid)::text))
sensors_8999000 | CREATE UNIQUE INDEX sensors_pkey_8999000 ON ONLY citus_split_test_schema_partitioned.sensors_8999000 USING btree (measureid, eventdatetime, measure_data)
sensors_news_8999006 | CREATE INDEX sensors_news_lower_idx_8999006 ON citus_split_test_schema_partitioned.sensors_news_8999006 USING btree (lower((measureid)::text))
sensors_news_8999006 | CREATE UNIQUE INDEX sensors_news_pkey_8999006 ON citus_split_test_schema_partitioned.sensors_news_8999006 USING btree (measureid, eventdatetime, measure_data)
sensors_old_8999002 | CREATE INDEX sensors_old_lower_idx_8999002 ON citus_split_test_schema_partitioned.sensors_old_8999002 USING btree (lower((measureid)::text))
sensors_old_8999002 | CREATE UNIQUE INDEX sensors_old_pkey_8999002 ON citus_split_test_schema_partitioned.sensors_old_8999002 USING btree (measureid, eventdatetime, measure_data)
(13 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
s1
s1_8999000
s2
s2_8999004
(4 rows)
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_2020_01_01_8999005 | fkey_from_child_to_child_8999005 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_child_to_dist_8999005 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_2020_01_01_8999005 | fkey_from_child_to_parent_8999005 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_child_to_ref_8999005 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999005 | sensors_2020_01_01_8999005_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_8999001 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_8999001 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_8999001 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_8999001 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_8999001 | sensors_8999001_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_news_8999007 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_news_8999007 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_news_8999007 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_news_8999007 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_old_8999003 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_old_8999003 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_old_8999003 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_old_8999003 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
(22 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
colocated_dist_table_8999009 | CREATE UNIQUE INDEX colocated_dist_table_pkey_8999009 ON citus_split_test_schema_partitioned.colocated_dist_table_8999009 USING btree (measureid)
colocated_partitioned_table_2020_01_01_8999013 | CREATE UNIQUE INDEX colocated_partitioned_table_2020_01_01_pkey_8999013 ON citus_split_test_schema_partitioned.colocated_partitioned_table_2020_01_01_8999013 USING btree (measureid, eventdatetime)
colocated_partitioned_table_8999011 | CREATE UNIQUE INDEX colocated_partitioned_table_pkey_8999011 ON ONLY citus_split_test_schema_partitioned.colocated_partitioned_table_8999011 USING btree (measureid, eventdatetime)
reference_table_8970007 | CREATE UNIQUE INDEX reference_table_pkey_8970007 ON citus_split_test_schema_partitioned.reference_table_8970007 USING btree (measureid)
sensors_2020_01_01_8999005 | CREATE INDEX index_on_child_8999005 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999005 USING btree (lower((measure_data)::text))
sensors_2020_01_01_8999005 | CREATE INDEX sensors_2020_01_01_lower_idx_8999005 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999005 USING btree (lower((measureid)::text))
sensors_2020_01_01_8999005 | CREATE UNIQUE INDEX sensors_2020_01_01_pkey_8999005 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999005 USING btree (measureid, eventdatetime, measure_data)
sensors_8999001 | CREATE INDEX index_on_parent_8999001 ON ONLY citus_split_test_schema_partitioned.sensors_8999001 USING btree (lower((measureid)::text))
sensors_8999001 | CREATE UNIQUE INDEX sensors_pkey_8999001 ON ONLY citus_split_test_schema_partitioned.sensors_8999001 USING btree (measureid, eventdatetime, measure_data)
sensors_news_8999007 | CREATE INDEX sensors_news_lower_idx_8999007 ON citus_split_test_schema_partitioned.sensors_news_8999007 USING btree (lower((measureid)::text))
sensors_news_8999007 | CREATE UNIQUE INDEX sensors_news_pkey_8999007 ON citus_split_test_schema_partitioned.sensors_news_8999007 USING btree (measureid, eventdatetime, measure_data)
sensors_old_8999003 | CREATE INDEX sensors_old_lower_idx_8999003 ON citus_split_test_schema_partitioned.sensors_old_8999003 USING btree (lower((measureid)::text))
sensors_old_8999003 | CREATE UNIQUE INDEX sensors_old_pkey_8999003 ON citus_split_test_schema_partitioned.sensors_old_8999003 USING btree (measureid, eventdatetime, measure_data)
(13 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
s1
s1_8999001
s2
s2_8999005
(4 rows)
-- END: Show the updated state on workers
-- BEGIN: Split a partition table directly
\c - - - :master_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.next_shard_id TO 8999100;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT pg_catalog.citus_split_shard_by_split_points(
8999002, -- sensors_old
ARRAY['-2127770000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
-- END: Split a partition table directly
-- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace ns ON cls.relnamespace = ns.oid
WHERE node.noderole = 'primary' AND ns.nspname = 'citus_split_test_schema_partitioned'
ORDER BY logicalrelid, shardminvalue::BIGINT;
shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport
---------------------------------------------------------------------
8999100 | sensors | -2147483648 | -2127770000 | localhost | 57637
8999101 | sensors | -2127769999 | -2120000000 | localhost | 57638
8999001 | sensors | -2119999999 | 2147483647 | localhost | 57638
8999102 | sensors_old | -2147483648 | -2127770000 | localhost | 57637
8999103 | sensors_old | -2127769999 | -2120000000 | localhost | 57638
8999003 | sensors_old | -2119999999 | 2147483647 | localhost | 57638
8999104 | sensors_2020_01_01 | -2147483648 | -2127770000 | localhost | 57637
8999105 | sensors_2020_01_01 | -2127769999 | -2120000000 | localhost | 57638
8999005 | sensors_2020_01_01 | -2119999999 | 2147483647 | localhost | 57638
8999106 | sensors_news | -2147483648 | -2127770000 | localhost | 57637
8999107 | sensors_news | -2127769999 | -2120000000 | localhost | 57638
8999007 | sensors_news | -2119999999 | 2147483647 | localhost | 57638
8999108 | colocated_dist_table | -2147483648 | -2127770000 | localhost | 57637
8999109 | colocated_dist_table | -2127769999 | -2120000000 | localhost | 57638
8999009 | colocated_dist_table | -2119999999 | 2147483647 | localhost | 57638
8999110 | colocated_partitioned_table | -2147483648 | -2127770000 | localhost | 57637
8999111 | colocated_partitioned_table | -2127769999 | -2120000000 | localhost | 57638
8999011 | colocated_partitioned_table | -2119999999 | 2147483647 | localhost | 57638
8999112 | colocated_partitioned_table_2020_01_01 | -2147483648 | -2127770000 | localhost | 57637
8999113 | colocated_partitioned_table_2020_01_01 | -2127769999 | -2120000000 | localhost | 57638
8999013 | colocated_partitioned_table_2020_01_01 | -2119999999 | 2147483647 | localhost | 57638
8970007 | reference_table | | | localhost | 57637
8970007 | reference_table | | | localhost | 57638
(23 rows)
SELECT count(*) FROM reference_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT count(*) FROM colocated_partitioned_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT count(*) FROM colocated_dist_table;
count
---------------------------------------------------------------------
1001
(1 row)
SELECT count(*) FROM sensors;
count
---------------------------------------------------------------------
1001
(1 row)
-- END: Validate Shard Info and Data
-- BEGIN: Show the updated state on workers
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_2020_01_01_8999104 | fkey_from_child_to_child_8999104 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_2020_01_01_8999104 | fkey_from_child_to_dist_8999104 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999108(measureid)
sensors_2020_01_01_8999104 | fkey_from_child_to_parent_8999104 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999110(eventdatetime, measureid)
sensors_2020_01_01_8999104 | fkey_from_child_to_ref_8999104 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999104 | fkey_from_parent_to_child_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_2020_01_01_8999104 | fkey_from_parent_to_dist_8999100 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999108(measureid)
sensors_2020_01_01_8999104 | fkey_from_parent_to_parent_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999110(eventdatetime, measureid)
sensors_2020_01_01_8999104 | fkey_from_parent_to_ref_8999100 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999104 | sensors_2020_01_01_8999104_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_8999100 | fkey_from_parent_to_child_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_8999100 | fkey_from_parent_to_dist_8999100 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999108(measureid)
sensors_8999100 | fkey_from_parent_to_parent_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999110(eventdatetime, measureid)
sensors_8999100 | fkey_from_parent_to_ref_8999100 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_8999100 | sensors_8999100_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_news_8999106 | fkey_from_parent_to_child_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_news_8999106 | fkey_from_parent_to_dist_8999100 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999108(measureid)
sensors_news_8999106 | fkey_from_parent_to_parent_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999110(eventdatetime, measureid)
sensors_news_8999106 | fkey_from_parent_to_ref_8999100 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_old_8999102 | fkey_from_parent_to_child_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999112(eventdatetime, measureid)
sensors_old_8999102 | fkey_from_parent_to_dist_8999100 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999108(measureid)
sensors_old_8999102 | fkey_from_parent_to_parent_8999100 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999110(eventdatetime, measureid)
sensors_old_8999102 | fkey_from_parent_to_ref_8999100 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
(22 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
colocated_dist_table_8999108 | CREATE UNIQUE INDEX colocated_dist_table_pkey_8999108 ON citus_split_test_schema_partitioned.colocated_dist_table_8999108 USING btree (measureid)
colocated_partitioned_table_2020_01_01_8999112 | CREATE UNIQUE INDEX colocated_partitioned_table_2020_01_01_pkey_8999112 ON citus_split_test_schema_partitioned.colocated_partitioned_table_2020_01_01_8999112 USING btree (measureid, eventdatetime)
colocated_partitioned_table_8999110 | CREATE UNIQUE INDEX colocated_partitioned_table_pkey_8999110 ON ONLY citus_split_test_schema_partitioned.colocated_partitioned_table_8999110 USING btree (measureid, eventdatetime)
reference_table_8970007 | CREATE UNIQUE INDEX reference_table_pkey_8970007 ON citus_split_test_schema_partitioned.reference_table_8970007 USING btree (measureid)
sensors_2020_01_01_8999104 | CREATE INDEX index_on_child_8999104 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999104 USING btree (lower((measure_data)::text))
sensors_2020_01_01_8999104 | CREATE INDEX sensors_2020_01_01_lower_idx_8999104 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999104 USING btree (lower((measureid)::text))
sensors_2020_01_01_8999104 | CREATE UNIQUE INDEX sensors_2020_01_01_pkey_8999104 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999104 USING btree (measureid, eventdatetime, measure_data)
sensors_8999100 | CREATE INDEX index_on_parent_8999100 ON ONLY citus_split_test_schema_partitioned.sensors_8999100 USING btree (lower((measureid)::text))
sensors_8999100 | CREATE UNIQUE INDEX sensors_pkey_8999100 ON ONLY citus_split_test_schema_partitioned.sensors_8999100 USING btree (measureid, eventdatetime, measure_data)
sensors_news_8999106 | CREATE INDEX sensors_news_lower_idx_8999106 ON citus_split_test_schema_partitioned.sensors_news_8999106 USING btree (lower((measureid)::text))
sensors_news_8999106 | CREATE UNIQUE INDEX sensors_news_pkey_8999106 ON citus_split_test_schema_partitioned.sensors_news_8999106 USING btree (measureid, eventdatetime, measure_data)
sensors_old_8999102 | CREATE INDEX sensors_old_lower_idx_8999102 ON citus_split_test_schema_partitioned.sensors_old_8999102 USING btree (lower((measureid)::text))
sensors_old_8999102 | CREATE UNIQUE INDEX sensors_old_pkey_8999102 ON citus_split_test_schema_partitioned.sensors_old_8999102 USING btree (measureid, eventdatetime, measure_data)
(13 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
s1
s1_8999100
s2
s2_8999104
(4 rows)
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
sensors_2020_01_01_8999005 | fkey_from_child_to_child_8999005 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_child_to_dist_8999005 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_2020_01_01_8999005 | fkey_from_child_to_parent_8999005 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_child_to_ref_8999005 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_2020_01_01_8999005 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999005 | sensors_2020_01_01_8999005_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_2020_01_01_8999105 | fkey_from_child_to_child_8999105 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_2020_01_01_8999105 | fkey_from_child_to_dist_8999105 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999109(measureid)
sensors_2020_01_01_8999105 | fkey_from_child_to_parent_8999105 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999111(eventdatetime, measureid)
sensors_2020_01_01_8999105 | fkey_from_child_to_ref_8999105 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999105 | fkey_from_parent_to_child_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_2020_01_01_8999105 | fkey_from_parent_to_dist_8999101 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999109(measureid)
sensors_2020_01_01_8999105 | fkey_from_parent_to_parent_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999111(eventdatetime, measureid)
sensors_2020_01_01_8999105 | fkey_from_parent_to_ref_8999101 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_2020_01_01_8999105 | sensors_2020_01_01_8999105_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_8999001 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_8999001 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_8999001 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_8999001 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_8999001 | sensors_8999001_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_8999101 | fkey_from_parent_to_child_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_8999101 | fkey_from_parent_to_dist_8999101 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999109(measureid)
sensors_8999101 | fkey_from_parent_to_parent_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999111(eventdatetime, measureid)
sensors_8999101 | fkey_from_parent_to_ref_8999101 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_8999101 | sensors_8999101_measureid_eventdatetime_fkey | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_news_8999007 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_news_8999007 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_news_8999007 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_news_8999007 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_news_8999107 | fkey_from_parent_to_child_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_news_8999107 | fkey_from_parent_to_dist_8999101 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999109(measureid)
sensors_news_8999107 | fkey_from_parent_to_parent_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999111(eventdatetime, measureid)
sensors_news_8999107 | fkey_from_parent_to_ref_8999101 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_old_8999003 | fkey_from_parent_to_child_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999013(eventdatetime, measureid)
sensors_old_8999003 | fkey_from_parent_to_dist_8999001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999009(measureid)
sensors_old_8999003 | fkey_from_parent_to_parent_8999001 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999011(eventdatetime, measureid)
sensors_old_8999003 | fkey_from_parent_to_ref_8999001 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
sensors_old_8999103 | fkey_from_parent_to_child_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_2020_01_01_8999113(eventdatetime, measureid)
sensors_old_8999103 | fkey_from_parent_to_dist_8999101 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8999109(measureid)
sensors_old_8999103 | fkey_from_parent_to_parent_8999101 | FOREIGN KEY (eventdatetime, measureid) REFERENCES colocated_partitioned_table_8999111(eventdatetime, measureid)
sensors_old_8999103 | fkey_from_parent_to_ref_8999101 | FOREIGN KEY (measureid) REFERENCES reference_table_8970007(measureid)
(44 rows)
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
tablename | indexdef
---------------------------------------------------------------------
colocated_dist_table_8999009 | CREATE UNIQUE INDEX colocated_dist_table_pkey_8999009 ON citus_split_test_schema_partitioned.colocated_dist_table_8999009 USING btree (measureid)
colocated_dist_table_8999109 | CREATE UNIQUE INDEX colocated_dist_table_pkey_8999109 ON citus_split_test_schema_partitioned.colocated_dist_table_8999109 USING btree (measureid)
colocated_partitioned_table_2020_01_01_8999013 | CREATE UNIQUE INDEX colocated_partitioned_table_2020_01_01_pkey_8999013 ON citus_split_test_schema_partitioned.colocated_partitioned_table_2020_01_01_8999013 USING btree (measureid, eventdatetime)
colocated_partitioned_table_2020_01_01_8999113 | CREATE UNIQUE INDEX colocated_partitioned_table_2020_01_01_pkey_8999113 ON citus_split_test_schema_partitioned.colocated_partitioned_table_2020_01_01_8999113 USING btree (measureid, eventdatetime)
colocated_partitioned_table_8999011 | CREATE UNIQUE INDEX colocated_partitioned_table_pkey_8999011 ON ONLY citus_split_test_schema_partitioned.colocated_partitioned_table_8999011 USING btree (measureid, eventdatetime)
colocated_partitioned_table_8999111 | CREATE UNIQUE INDEX colocated_partitioned_table_pkey_8999111 ON ONLY citus_split_test_schema_partitioned.colocated_partitioned_table_8999111 USING btree (measureid, eventdatetime)
reference_table_8970007 | CREATE UNIQUE INDEX reference_table_pkey_8970007 ON citus_split_test_schema_partitioned.reference_table_8970007 USING btree (measureid)
sensors_2020_01_01_8999005 | CREATE INDEX index_on_child_8999005 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999005 USING btree (lower((measure_data)::text))
sensors_2020_01_01_8999005 | CREATE INDEX sensors_2020_01_01_lower_idx_8999005 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999005 USING btree (lower((measureid)::text))
sensors_2020_01_01_8999005 | CREATE UNIQUE INDEX sensors_2020_01_01_pkey_8999005 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999005 USING btree (measureid, eventdatetime, measure_data)
sensors_2020_01_01_8999105 | CREATE INDEX index_on_child_8999105 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999105 USING btree (lower((measure_data)::text))
sensors_2020_01_01_8999105 | CREATE INDEX sensors_2020_01_01_lower_idx_8999105 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999105 USING btree (lower((measureid)::text))
sensors_2020_01_01_8999105 | CREATE UNIQUE INDEX sensors_2020_01_01_pkey_8999105 ON citus_split_test_schema_partitioned.sensors_2020_01_01_8999105 USING btree (measureid, eventdatetime, measure_data)
sensors_8999001 | CREATE INDEX index_on_parent_8999001 ON ONLY citus_split_test_schema_partitioned.sensors_8999001 USING btree (lower((measureid)::text))
sensors_8999001 | CREATE UNIQUE INDEX sensors_pkey_8999001 ON ONLY citus_split_test_schema_partitioned.sensors_8999001 USING btree (measureid, eventdatetime, measure_data)
sensors_8999101 | CREATE INDEX index_on_parent_8999101 ON ONLY citus_split_test_schema_partitioned.sensors_8999101 USING btree (lower((measureid)::text))
sensors_8999101 | CREATE UNIQUE INDEX sensors_pkey_8999101 ON ONLY citus_split_test_schema_partitioned.sensors_8999101 USING btree (measureid, eventdatetime, measure_data)
sensors_news_8999007 | CREATE INDEX sensors_news_lower_idx_8999007 ON citus_split_test_schema_partitioned.sensors_news_8999007 USING btree (lower((measureid)::text))
sensors_news_8999007 | CREATE UNIQUE INDEX sensors_news_pkey_8999007 ON citus_split_test_schema_partitioned.sensors_news_8999007 USING btree (measureid, eventdatetime, measure_data)
sensors_news_8999107 | CREATE INDEX sensors_news_lower_idx_8999107 ON citus_split_test_schema_partitioned.sensors_news_8999107 USING btree (lower((measureid)::text))
sensors_news_8999107 | CREATE UNIQUE INDEX sensors_news_pkey_8999107 ON citus_split_test_schema_partitioned.sensors_news_8999107 USING btree (measureid, eventdatetime, measure_data)
sensors_old_8999003 | CREATE INDEX sensors_old_lower_idx_8999003 ON citus_split_test_schema_partitioned.sensors_old_8999003 USING btree (lower((measureid)::text))
sensors_old_8999003 | CREATE UNIQUE INDEX sensors_old_pkey_8999003 ON citus_split_test_schema_partitioned.sensors_old_8999003 USING btree (measureid, eventdatetime, measure_data)
sensors_old_8999103 | CREATE INDEX sensors_old_lower_idx_8999103 ON citus_split_test_schema_partitioned.sensors_old_8999103 USING btree (lower((measureid)::text))
sensors_old_8999103 | CREATE UNIQUE INDEX sensors_old_pkey_8999103 ON citus_split_test_schema_partitioned.sensors_old_8999103 USING btree (measureid, eventdatetime, measure_data)
(25 rows)
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
s1
s1_8999001
s1_8999101
s2
s2_8999005
s2_8999105
(6 rows)
-- END: Show the updated state on workers
--BEGIN : Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_split_test_schema_partitioned" CASCADE;
NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table citus_split_test_schema_partitioned.sensors
drop cascades to table citus_split_test_schema_partitioned.colocated_dist_table
drop cascades to table citus_split_test_schema_partitioned.colocated_partitioned_table
drop cascades to table citus_split_test_schema_partitioned.reference_table
--END : Cleanup

View File

@ -218,8 +218,6 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY['-1073741824'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981000" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
@ -231,8 +229,6 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY['536870911', '1610612735'],
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981001" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
@ -396,8 +392,6 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY['-2100000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_8981007" does not exist
CONTEXT: while executing command on localhost:xxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------

View File

@ -1,6 +1,5 @@
unused step name: s2-insert-2
unused step name: s2-select
Parsed test spec with 3 sessions
Parsed test spec with 4 sessions
starting permutation: s1-load-cache s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
@ -46,7 +45,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -178,7 +176,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -280,7 +277,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -330,6 +326,141 @@ id|value
(0 rows)
starting permutation: s1-load-cache s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s4-begin s3-release-advisory-lock s4-insert s1-end s4-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-load-cache:
-- Indirect way to load cache.
TRUNCATE to_split_table;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 0
(1 row)
id|value
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
ARRAY['-1073741824'],
ARRAY[2, 2],
'force_logical');
<waiting ...>
step s2-insert:
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (123456789, 1);
get_shard_id_for_distribution_column
---------------------------------------------------------------------
1500001
(1 row)
step s2-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500001|t | 1
(1 row)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
step s4-begin:
BEGIN;
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-non-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s4-insert:
INSERT INTO to_split_table VALUES (900, 1);
<waiting ...>
step s1-end:
COMMIT;
step s4-insert: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s4-end:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('to_split_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM to_split_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57638|1500002|t | 0
57638|1500003|t | 1
(2 rows)
id|value
---------------------------------------------------------------------
123456789| 1
(1 row)
starting permutation: s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster
create_distributed_table
---------------------------------------------------------------------
@ -370,7 +501,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -498,7 +628,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -596,7 +725,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,

View File

@ -25,7 +25,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s2: WARNING: replication slot "citus_shard_split_template_slot_1500002" does not exist
step s2-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500002,
@ -109,7 +108,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s2: WARNING: replication slot "citus_shard_split_template_slot_1500002" does not exist
step s2-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500002,
@ -192,7 +190,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s2: WARNING: replication slot "citus_shard_split_template_slot_1500002" does not exist
step s2-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500002,

View File

@ -1,4 +1,3 @@
unused step name: s2-insert-2
unused step name: s2-select
Parsed test spec with 3 sessions
@ -46,7 +45,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -178,7 +176,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -280,7 +277,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -370,7 +366,6 @@ step s1-begin:
step s2-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -498,7 +493,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,
@ -596,7 +590,6 @@ pg_advisory_lock
step s1-begin:
BEGIN;
s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist
step s1-non-blocking-shard-split:
SELECT pg_catalog.citus_split_shard_by_split_points(
1500001,

View File

@ -222,3 +222,12 @@ SELECT * FROM table_second_9;
100 | a
(1 row)
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DROP PUBLICATION pub1;
DROP PUBLICATION pub2;
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DROP SUBSCRIPTION sub2;

View File

@ -1,5 +1,9 @@
// we use 15 as the partition key value through out the test
// so setting the corresponding shard here is useful
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE
// session s1 - Executes non-blocking shard split
// session s2 - Does concurrent writes
// session s3 - Holds advisory locks
// session s4 - Tries to insert when the shards are Blocked for write
//
setup
{
SET citus.shard_count to 1;
@ -57,12 +61,6 @@ step "s2-insert"
INSERT INTO to_split_table VALUES (123456789, 1);
}
step "s2-insert-2"
{
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (1234567819, 1);
}
step "s2-update"
{
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
@ -112,20 +110,49 @@ step "s3-release-advisory-lock"
SELECT pg_advisory_unlock(44000, 55152);
}
##// nonblocking tests lie below ###
session "s4"
// move placement first
// the following tests show the non-blocking modifications while shard is being moved
// in fact, the shard move blocks the writes for a very short duration of time
// by using an advisory and allowing the other commands continue to run, we prevent
// the modifications to block on that blocking duration
//permutation "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-insert" "s3-release-advisory-lock" "s1-end" "s1-select" "s1-get-shard-distribution"
step "s4-begin"
{
BEGIN;
}
step "s4-insert"
{
INSERT INTO to_split_table VALUES (900, 1);
}
step "s4-end"
{
COMMIT;
}
// Concurrent Insert:
// s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 inserts a row successfully demonstrating nonblocking split -> s3 releases the advisory lock
// -> s1 completes split -> result is reflected in new shards
permutation "s1-load-cache" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
// Concurrent Update:
// s2 inserts a row to be updated later ->s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 udpates the row -> s3 releases the advisory lock
// -> s1 completes split -> result is reflected in new shards
permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
// Concurrent Delete:
// s2 inserts a row to be deleted later ->s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 deletes the row -> s3 releases the advisory lock
// -> s1 completes split -> result is reflected in new shards
permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
// Demonstrating blocking Insert when the writes are blocked by nonblocking split workflow
// s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 inserts the row successfully-> s4 begins-> s3 releases the advisory lock thus s2 moves ahead to block writes
// -> s4 inserts(waiting as the writes are blocked) -> s1 commits -> s4 fails as shard meta data gets update
permutation "s1-load-cache" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s4-begin" "s3-release-advisory-lock" "s4-insert" "s1-end" "s4-end" "s2-print-cluster"
// Same flow without loading cache
permutation "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"

View File

@ -1,3 +1,9 @@
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE.
// Test uses Index as replica identity.
// session s1 - Does concurrent writes on reference table
// session s2 - Executes non-blocking shard split
// session s3 - Holds advisory locks
setup
{
SELECT setval('pg_dist_shardid_seq', 1500000);
@ -113,9 +119,24 @@ step "s3-release-advisory-lock"
}
// Run shard split while concurrently performing an DML and index creation on the
// Run nonblocking shard split while concurrently performing an DML on the
// reference table which the distributed table have a foreign key to.
// All modifications should block on shard split.
// Modifications should not be blocked.
// Concurrent Insert:
// s2 add fkey constrain->s3 holds advisory lock -> s2 starts non-blocking shard split and waits for advisory lock ->
// s1 inserts a row in reference table successfully demonstrating nonblocking split -> s3 releases the advisory lock
// -> s2 completes split -> result is reflected in new shards
permutation "s2-add-fkey" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s2-non-blocking-shard-split" "s1-insert" "s1-commit" "s3-release-advisory-lock" "s2-commit" "s2-print-cluster"
// Concurrent Update:
// s2 add fkey constrain->s3 holds advisory lock -> s2 starts non-blocking shard split and waits for advisory lock ->
// s1 updates row of reference table -> s3 releases the advisory lock
// -> s2 completes split -> result is reflected in new shards
permutation "s2-add-fkey" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s2-non-blocking-shard-split" "s1-update" "s1-commit" "s3-release-advisory-lock" "s2-commit" "s2-print-cluster"
// Concurrent Delete:
// s2 add fkey constrain->s3 holds advisory lock -> s2 starts non-blocking shard split and waits for advisory lock ->
// s1 deletes row of reference table -> s3 releases the advisory lock
// -> s2 completes split -> result is reflected in new shards
permutation "s2-add-fkey" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s2-non-blocking-shard-split" "s1-delete" "s1-commit" "s3-release-advisory-lock" "s2-commit" "s2-print-cluster"

View File

@ -1,3 +1,8 @@
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE.
// Test uses Index as replica identity.
// session s1 - Executes non-blocking shard split
// session s2 - Does concurrent writes
// session s3 - Holds advisory locks
setup
{
SET citus.shard_count to 1;
@ -58,12 +63,6 @@ step "s2-insert"
INSERT INTO to_split_table VALUES (123456789, 1);
}
step "s2-insert-2"
{
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
INSERT INTO to_split_table VALUES (1234567819, 1);
}
step "s2-update"
{
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
@ -115,18 +114,25 @@ step "s3-release-advisory-lock"
##// nonblocking tests lie below ###
// move placement first
// the following tests show the non-blocking modifications while shard is being moved
// in fact, the shard move blocks the writes for a very short duration of time
// by using an advisory and allowing the other commands continue to run, we prevent
// the modifications to block on that blocking duration
//permutation "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-insert" "s3-release-advisory-lock" "s1-end" "s1-select" "s1-get-shard-distribution"
// Concurrent Insert:
// s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 inserts a row successfully demonstrating nonblocking split -> s3 releases the advisory lock
// -> s1 completes split -> result is reflected in new shards
permutation "s1-load-cache" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
// Concurrent Update:
// s2 inserts a row to be updated later ->s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 udpates the row -> s3 releases the advisory lock
// -> s1 completes split -> result is reflected in new shards
permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
// Concurrent Delete:
// s2 inserts a row to be deleted later ->s3 holds advisory lock -> s1 starts non-blocking shard split and waits for advisory lock ->
// s2 deletes the row -> s3 releases the advisory lock
// -> s1 completes split -> result is reflected in new shards
permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
// Same flow without loading cache
permutation "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"
permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"

View File

@ -21,4 +21,4 @@ test: citus_split_shard_by_split_points_failure
# use citus_split_shard_columnar_partitioned instead.
test: citus_split_shard_columnar_partitioned
test: citus_non_blocking_split_shards
# test: citus_sameer
test: citus_non_blocking_split_shard_partitioned

View File

@ -0,0 +1,269 @@
CREATE SCHEMA "citus_split_test_schema_partitioned";
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.next_shard_id TO 8970000;
SET citus.next_placement_id TO 8770000;
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
CREATE TABLE sensors(
measureid integer,
eventdatetime date,
measure_data jsonb,
PRIMARY KEY (measureid, eventdatetime, measure_data))
PARTITION BY RANGE(eventdatetime);
-- Create Partitions of table 'sensors'.
CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01');
-- Create index on parent and child partitions.
CREATE INDEX index_on_parent ON sensors(lower(measureid::text));
CREATE INDEX index_on_child ON sensors_2020_01_01(lower(measure_data::text));
ALTER INDEX index_on_parent ALTER COLUMN 1 SET STATISTICS 1000;
ALTER INDEX index_on_child ALTER COLUMN 1 SET STATISTICS 1000;
-- Create statistics on parent and child partitions.
CREATE STATISTICS s1 (dependencies) ON measureid, eventdatetime FROM sensors;
CREATE STATISTICS s2 (dependencies) ON measureid, eventdatetime FROM sensors_2020_01_01;
CLUSTER sensors_2020_01_01 USING index_on_child;
SELECT create_distributed_table('sensors', 'measureid');
-- create colocated distributed tables
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
SELECT create_distributed_table('colocated_dist_table', 'measureid');
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
CREATE TABLE colocated_partitioned_table(
measureid integer,
eventdatetime date,
PRIMARY KEY (measureid, eventdatetime))
PARTITION BY RANGE(eventdatetime);
CREATE TABLE colocated_partitioned_table_2020_01_01 PARTITION OF colocated_partitioned_table FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
SELECT create_distributed_table('colocated_partitioned_table', 'measureid');
CLUSTER colocated_partitioned_table_2020_01_01 USING colocated_partitioned_table_2020_01_01_pkey;
-- create reference tables
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace ns ON cls.relnamespace = ns.oid
WHERE node.noderole = 'primary' AND ns.nspname = 'citus_split_test_schema_partitioned'
ORDER BY logicalrelid, shardminvalue::BIGINT;
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
-- BEGIN: Create constraints for tables.
-- from parent to regular dist
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
-- from parent to parent
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_parent FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table(measureid, eventdatetime);
-- from parent to child
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_child FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid, eventdatetime);
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_ref FOREIGN KEY (measureid) REFERENCES reference_table(measureid);
-- from child to regular dist
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
-- from child to parent
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_parent FOREIGN KEY (measureid,eventdatetime) REFERENCES colocated_partitioned_table(measureid,eventdatetime);
-- from child to child
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_child FOREIGN KEY (measureid,eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid,eventdatetime);
ALTER TABLE sensors_2020_01_01 ADD CONSTRAINT fkey_from_child_to_ref FOREIGN KEY (measureid) REFERENCES reference_table(measureid);
-- No support for foreign keys, unique constraints, or exclusion constraints in columnar tables.
-- Please see: https://github.com/citusdata/citus/tree/main/src/backend/columnar/README.md
-- END: Create constraints for tables.
-- BEGIN: Load data into tables
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
-- END: Load data into tables
-- BEGIN: Show the current state on workers
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
-- END: Show the current state on workers
-- BEGIN: Split a shard along its co-located shards
\c - - - :master_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.next_shard_id TO 8999000;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT pg_catalog.citus_split_shard_by_split_points(
8970000,
ARRAY['-2120000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
-- END: Split a shard along its co-located shards
-- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace ns ON cls.relnamespace = ns.oid
WHERE node.noderole = 'primary' AND ns.nspname = 'citus_split_test_schema_partitioned'
ORDER BY logicalrelid, shardminvalue::BIGINT;
SELECT count(*) FROM reference_table;
SELECT count(*) FROM colocated_partitioned_table;
SELECT count(*) FROM colocated_dist_table;
SELECT count(*) FROM sensors;
-- END: Validate Shard Info and Data
-- BEGIN: Show the updated state on workers
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
-- END: Show the updated state on workers
-- BEGIN: Split a partition table directly
\c - - - :master_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.next_shard_id TO 8999100;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT pg_catalog.citus_split_shard_by_split_points(
8999002, -- sensors_old
ARRAY['-2127770000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
-- END: Split a partition table directly
-- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace ns ON cls.relnamespace = ns.oid
WHERE node.noderole = 'primary' AND ns.nspname = 'citus_split_test_schema_partitioned'
ORDER BY logicalrelid, shardminvalue::BIGINT;
SELECT count(*) FROM reference_table;
SELECT count(*) FROM colocated_partitioned_table;
SELECT count(*) FROM colocated_dist_table;
SELECT count(*) FROM sensors;
-- END: Validate Shard Info and Data
-- BEGIN: Show the updated state on workers
\c - - - :worker_1_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema_partitioned";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname, fk."Constraint", fk."Definition"
FROM pg_catalog.pg_class tbl
JOIN public.table_fkeys fk on tbl.oid = fk.relid
WHERE tbl.relname like '%_89%'
ORDER BY 1, 2;
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like '%_89%' ORDER BY 1,2;
SELECT stxname FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('citus_split_test_schema_partitioned')
)
ORDER BY stxname ASC;
-- END: Show the updated state on workers
--BEGIN : Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_split_test_schema_partitioned" CASCADE;
--END : Cleanup

View File

@ -150,3 +150,14 @@ SELECT * FROM table_second_8;
SELECT wait_for_expected_rowcount_at_table('table_second_9', 1);
SELECT * FROM table_second_9;
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DROP PUBLICATION pub1;
DROP PUBLICATION pub2;
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DROP SUBSCRIPTION sub2;