mirror of https://github.com/citusdata/citus.git
Fix metadata sync issue. Update regression test
parent
e29222458c
commit
55770d2816
|
@ -750,11 +750,11 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
|
||||||
SHARD_STATE_ACTIVE,
|
SHARD_STATE_ACTIVE,
|
||||||
0, /* shard length (zero for HashDistributed Table) */
|
0, /* shard length (zero for HashDistributed Table) */
|
||||||
workerPlacementNode->groupId);
|
workerPlacementNode->groupId);
|
||||||
}
|
|
||||||
|
|
||||||
if (ShouldSyncTableMetadata(shardInterval->relationId))
|
if (ShouldSyncTableMetadata(shardInterval->relationId))
|
||||||
{
|
{
|
||||||
syncedShardList = lappend(syncedShardList, shardInterval);
|
syncedShardList = lappend(syncedShardList, shardInterval);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,133 +1,418 @@
|
||||||
-- Split Shards by Split Points tests.
|
/*
|
||||||
-- Setup for Test.
|
Citus Shard Split Test.The test is model similar to 'shard_move_constraints'.
|
||||||
CREATE SCHEMA citus_split_shard_by_split_points;
|
Here is a high level overview of test plan:
|
||||||
SET search_path TO citus_split_shard_by_split_points;
|
1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table.
|
||||||
SET citus.shard_count TO 1;
|
2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors.
|
||||||
|
3. Create Foreign key constraints between the two co-located distributed tables.
|
||||||
|
4. Load data into the three tables.
|
||||||
|
5. Move one of the shards for 'sensors' to test ShardMove -> Split.
|
||||||
|
6. Trigger Split on both shards of 'sensors'. This will also split co-located tables.
|
||||||
|
7. Move one of the split shard to test Split -> ShardMove.
|
||||||
|
8. Split an already split shard second time on a different schema.
|
||||||
|
*/
|
||||||
|
CREATE SCHEMA "citus_split_test_schema";
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
SET citus.next_shard_id TO 8981000;
|
||||||
|
SET citus.next_placement_id TO 8610000;
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.next_shard_id TO 82060000;
|
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
|
||||||
-- Create distributed table 'lineitem_streaming'
|
CREATE TABLE sensors(
|
||||||
CREATE TABLE lineitem_streaming (
|
measureid integer,
|
||||||
l_orderkey bigint not null,
|
eventdatetime date,
|
||||||
l_partkey integer not null,
|
measure_data jsonb,
|
||||||
l_suppkey integer not null,
|
meaure_quantity decimal(15, 2),
|
||||||
l_linenumber integer not null,
|
measure_status char(1),
|
||||||
l_quantity decimal(15, 2) not null,
|
measure_comment varchar(44),
|
||||||
l_extendedprice decimal(15, 2) not null,
|
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||||
l_discount decimal(15, 2) not null,
|
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||||
l_tax decimal(15, 2) not null,
|
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||||
l_returnflag char(1) not null,
|
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||||
l_linestatus char(1) not null,
|
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||||
l_shipdate date not null,
|
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;
|
||||||
l_commitdate date not null,
|
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
|
||||||
l_receiptdate date not null,
|
|
||||||
l_shipinstruct char(25) not null,
|
|
||||||
l_shipmode char(10) not null,
|
|
||||||
l_comment varchar(44) not null);
|
|
||||||
SELECT create_distributed_table('lineitem_streaming', 'l_orderkey');
|
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Create distributed table 'orders_streaming'
|
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
|
||||||
CREATE TABLE orders_streaming (
|
-- BEGIN: Create co-located distributed and reference tables.
|
||||||
o_orderkey bigint not null primary key,
|
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
|
||||||
o_custkey integer not null,
|
SELECT create_reference_table('reference_table');
|
||||||
o_orderstatus char(1) not null,
|
create_reference_table
|
||||||
o_totalprice decimal(15,2) not null,
|
---------------------------------------------------------------------
|
||||||
o_orderdate date not null,
|
|
||||||
o_orderpriority char(15) not null,
|
(1 row)
|
||||||
o_clerk char(15) not null,
|
|
||||||
o_shippriority integer not null,
|
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||||
o_comment varchar(79) not null);
|
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||||
SELECT create_distributed_table('orders_streaming', 'o_orderkey');
|
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Insert data into 'lineitem_streaming'
|
CREATE TABLE table_with_index_rep_identity(key int NOT NULL);
|
||||||
\COPY lineitem_streaming FROM STDIN WITH DELIMITER '|'
|
CREATE UNIQUE INDEX uqx ON table_with_index_rep_identity(key);
|
||||||
-- Insert data into 'orders_streaming'
|
ALTER TABLE table_with_index_rep_identity REPLICA IDENTITY USING INDEX uqx;
|
||||||
\COPY orders_streaming FROM STDIN WITH DELIMITER '|'
|
CLUSTER table_with_index_rep_identity USING uqx;
|
||||||
-- Initial Row Count in Shard
|
SELECT create_distributed_table('table_with_index_rep_identity', 'key', colocate_with:='sensors');
|
||||||
SELECT COUNT(*) FROM orders_streaming;
|
create_distributed_table
|
||||||
count
|
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
7
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT COUNT(*) FROM lineitem_streaming;
|
-- END: Create co-located distributed and reference tables.
|
||||||
count
|
-- BEGIN : Create Foreign key constraints.
|
||||||
---------------------------------------------------------------------
|
ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
|
||||||
22
|
-- END : Create Foreign key constraints.
|
||||||
(1 row)
|
-- BEGIN : Load data into tables.
|
||||||
|
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||||
-- Create Foreign constraint between two shards.
|
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||||
ALTER TABLE lineitem_streaming ADD CONSTRAINT test_constraint
|
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
|
||||||
FOREIGN KEY(l_orderkey) REFERENCES orders_streaming(o_orderkey);
|
-- END: Load data into tables.
|
||||||
-- Before Split, List shard and placement data.
|
-- BEGIN : Display current state.
|
||||||
SELECT shard.shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue nodename, nodeport, placementid
|
-- TODO(niupre): Can we refactor this to be a function?
|
||||||
|
SELECT cls.relnamespace, shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid
|
||||||
FROM pg_dist_shard AS shard
|
FROM pg_dist_shard AS shard
|
||||||
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
||||||
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
||||||
WHERE node.noderole = 'primary' AND (logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass)
|
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
|
||||||
|
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
|
||||||
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
||||||
shardid | logicalrelid | shardstorage | shardminvalue | nodename | nodeport | placementid
|
relnamespace | shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport | placementid
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
82060000 | lineitem_streaming | t | -2147483648 | 2147483647 | 57637 | 117
|
18296 | 8981000 | sensors | -2147483648 | -1 | localhost | 57637 | 8610000
|
||||||
82060001 | orders_streaming | t | -2147483648 | 2147483647 | 57637 | 118
|
18296 | 8981001 | sensors | 0 | 2147483647 | localhost | 57638 | 8610001
|
||||||
|
18296 | 8981003 | colocated_dist_table | -2147483648 | -1 | localhost | 57637 | 8610004
|
||||||
|
18296 | 8981004 | colocated_dist_table | 0 | 2147483647 | localhost | 57638 | 8610005
|
||||||
|
18296 | 8981005 | table_with_index_rep_identity | -2147483648 | -1 | localhost | 57637 | 8610006
|
||||||
|
18296 | 8981006 | table_with_index_rep_identity | 0 | 2147483647 | localhost | 57638 | 8610007
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
relname | Constraint | Definition
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981000 | fkey_table_to_dist_8981000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981003(measureid)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981000 | CREATE INDEX hash_index_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981000 | CREATE INDEX index_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981000 | CREATE INDEX index_with_include_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981000 | CREATE UNIQUE INDEX sensors_pkey_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
table_with_index_rep_identity_8981005 | CREATE UNIQUE INDEX uqx_8981005 ON citus_split_test_schema.table_with_index_rep_identity_8981005 USING btree (key)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
stxname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
stats_on_sensors
|
||||||
|
stats_on_sensors_8981000
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Trigger five way way Split on Shard.
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
relname | Constraint | Definition
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981001 | fkey_table_to_dist_8981001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981004(measureid)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981001 | CREATE INDEX hash_index_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981001 | CREATE INDEX index_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981001 | CREATE INDEX index_with_include_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981001 | CREATE UNIQUE INDEX sensors_pkey_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
table_with_index_rep_identity_8981006 | CREATE UNIQUE INDEX uqx_8981006 ON citus_split_test_schema.table_with_index_rep_identity_8981006 USING btree (key)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
stxname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
stats_on_sensors
|
||||||
|
stats_on_sensors_8981001
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- END : Display current state
|
||||||
|
-- BEGIN : Move one shard before we split it.
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
SET citus.next_shard_id TO 8981007;
|
||||||
|
SET citus.defer_drop_after_shard_move TO OFF;
|
||||||
|
SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- END : Move one shard before we split it.
|
||||||
|
-- BEGIN : Set node id variables
|
||||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||||
SELECT * FROM citus_split_shard_by_split_points(
|
-- END : Set node id variables
|
||||||
82060000,
|
-- BEGIN : Split two shards : One with move and One without move.
|
||||||
ARRAY['268435455', '536870911', '1073741823', '1610612735'],
|
-- Perform 2 way split
|
||||||
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node, :worker_2_node, :worker_2_node],
|
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
|
8981000,
|
||||||
|
ARRAY['-1073741824'],
|
||||||
|
ARRAY[:worker_1_node, :worker_2_node],
|
||||||
'blocking');
|
'blocking');
|
||||||
citus_split_shard_by_split_points
|
citus_split_shard_by_split_points
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Row Count in Shard after Split (should be same as before)
|
-- Perform 3 way split
|
||||||
SELECT COUNT(*) FROM orders_streaming;
|
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
count
|
8981001,
|
||||||
|
ARRAY['536870911', '1610612735'],
|
||||||
|
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node],
|
||||||
|
'blocking');
|
||||||
|
citus_split_shard_by_split_points
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
7
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT COUNT(*) FROM lineitem_streaming;
|
-- END : Split two shards : One with move and One without move.
|
||||||
count
|
-- BEGIN : Move a shard post split.
|
||||||
|
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
22
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- After Split, List shard and placement data.
|
-- END : Move a shard post split.
|
||||||
SELECT shard.shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue nodename, nodeport, placementid
|
-- BEGIN : Display current state.
|
||||||
|
-- TODO(niupre): Can we refactor this to be a function?
|
||||||
|
SELECT cls.relnamespace, shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid
|
||||||
FROM pg_dist_shard AS shard
|
FROM pg_dist_shard AS shard
|
||||||
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
||||||
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
||||||
WHERE node.noderole = 'primary' AND (logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass)
|
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
|
||||||
|
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
|
||||||
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
||||||
shardid | logicalrelid | shardstorage | shardminvalue | nodename | nodeport | placementid
|
relnamespace | shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport | placementid
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
82060002 | lineitem_streaming | t | -2147483648 | 268435455 | 57637 | 119
|
18296 | 8981007 | sensors | -2147483648 | -1073741824 | localhost | 57638 | 135
|
||||||
82060003 | lineitem_streaming | t | 268435456 | 536870911 | 57637 | 120
|
18296 | 8981008 | sensors | -1073741823 | -1 | localhost | 57638 | 121
|
||||||
82060004 | lineitem_streaming | t | 536870912 | 1073741823 | 57638 | 121
|
18296 | 8981013 | sensors | 0 | 536870911 | localhost | 57637 | 126
|
||||||
82060005 | lineitem_streaming | t | 1073741824 | 1610612735 | 57638 | 122
|
18296 | 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 | 127
|
||||||
82060006 | lineitem_streaming | t | 1610612736 | 2147483647 | 57638 | 123
|
18296 | 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 | 128
|
||||||
82060007 | orders_streaming | t | -2147483648 | 268435455 | 57637 | 124
|
18296 | 8981009 | colocated_dist_table | -2147483648 | -1073741824 | localhost | 57638 | 136
|
||||||
82060008 | orders_streaming | t | 268435456 | 536870911 | 57637 | 125
|
18296 | 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 | 123
|
||||||
82060009 | orders_streaming | t | 536870912 | 1073741823 | 57638 | 126
|
18296 | 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 | 129
|
||||||
82060010 | orders_streaming | t | 1073741824 | 1610612735 | 57638 | 127
|
18296 | 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 | 130
|
||||||
82060011 | orders_streaming | t | 1610612736 | 2147483647 | 57638 | 128
|
18296 | 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 | 131
|
||||||
(10 rows)
|
18296 | 8981011 | table_with_index_rep_identity | -2147483648 | -1073741824 | localhost | 57638 | 137
|
||||||
|
18296 | 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 | 125
|
||||||
|
18296 | 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 | 132
|
||||||
|
18296 | 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 | 133
|
||||||
|
18296 | 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 | 134
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
-- Cleanup for Test.
|
\c - - - :worker_1_port
|
||||||
\c - - - :master_port
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
SET client_min_messages TO WARNING;
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
DROP SCHEMA citus_split_shard_by_split_points_blocking CASCADE;
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
ERROR: schema "citus_split_shard_by_split_points_blocking" does not exist
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
relname | Constraint | Definition
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981013 | fkey_table_to_dist_8981013 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981016(measureid)
|
||||||
|
sensors_8981014 | fkey_table_to_dist_8981014 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981017(measureid)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981013 | CREATE INDEX hash_index_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981013 | CREATE INDEX index_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981013 | CREATE INDEX index_with_include_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981013 | CREATE UNIQUE INDEX sensors_pkey_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
sensors_8981014 | CREATE INDEX hash_index_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981014 | CREATE INDEX index_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981014 | CREATE INDEX index_with_include_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981014 | CREATE UNIQUE INDEX sensors_pkey_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
table_with_index_rep_identity_8981019 | CREATE UNIQUE INDEX uqx_8981019 ON citus_split_test_schema.table_with_index_rep_identity_8981019 USING btree (key)
|
||||||
|
table_with_index_rep_identity_8981020 | CREATE UNIQUE INDEX uqx_8981020 ON citus_split_test_schema.table_with_index_rep_identity_8981020 USING btree (key)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
stxname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
stats_on_sensors
|
||||||
|
stats_on_sensors_8981013
|
||||||
|
stats_on_sensors_8981014
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
relname | Constraint | Definition
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981007 | fkey_table_to_dist_8981007 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981009(measureid)
|
||||||
|
sensors_8981008 | fkey_table_to_dist_8981008 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981010(measureid)
|
||||||
|
sensors_8981015 | fkey_table_to_dist_8981015 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981018(measureid)
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
sensors_8981007 | CREATE INDEX hash_index_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981007 | CREATE INDEX index_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981007 | CREATE INDEX index_with_include_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981007 | CREATE UNIQUE INDEX sensors_pkey_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
sensors_8981008 | CREATE INDEX hash_index_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981008 | CREATE INDEX index_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981008 | CREATE INDEX index_with_include_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981008 | CREATE UNIQUE INDEX sensors_pkey_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
sensors_8981015 | CREATE INDEX hash_index_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING hash (((measure_data -> 'IsFailed'::text)))
|
||||||
|
sensors_8981015 | CREATE INDEX index_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (lower((measureid)::text))
|
||||||
|
sensors_8981015 | CREATE INDEX index_with_include_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status)
|
||||||
|
sensors_8981015 | CREATE UNIQUE INDEX sensors_pkey_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (measureid, eventdatetime, measure_data)
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
tablename | indexdef
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
table_with_index_rep_identity_8981011 | CREATE UNIQUE INDEX uqx_8981011 ON citus_split_test_schema.table_with_index_rep_identity_8981011 USING btree (key)
|
||||||
|
table_with_index_rep_identity_8981012 | CREATE UNIQUE INDEX uqx_8981012 ON citus_split_test_schema.table_with_index_rep_identity_8981012 USING btree (key)
|
||||||
|
table_with_index_rep_identity_8981021 | CREATE UNIQUE INDEX uqx_8981021 ON citus_split_test_schema.table_with_index_rep_identity_8981021 USING btree (key)
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
stxname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
stats_on_sensors
|
||||||
|
stats_on_sensors_8981007
|
||||||
|
stats_on_sensors_8981008
|
||||||
|
stats_on_sensors_8981015
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- END : Display current state
|
||||||
|
-- BEGIN: Should be able to change/drop constraints
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed;
|
||||||
|
ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200;
|
||||||
|
DROP STATISTICS stats_on_sensors;
|
||||||
|
DROP INDEX index_on_sensors_renamed;
|
||||||
|
ALTER TABLE sensors DROP CONSTRAINT fkey_table_to_dist;
|
||||||
|
-- END: Should be able to change/drop constraints
|
||||||
|
-- BEGIN: Split second time on another schema
|
||||||
|
SET search_path TO public;
|
||||||
|
SET citus.next_shard_id TO 8981031;
|
||||||
|
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
|
8981007,
|
||||||
|
ARRAY['-2100000000'],
|
||||||
|
ARRAY[:worker_1_node, :worker_2_node],
|
||||||
|
'blocking');
|
||||||
|
citus_split_shard_by_split_points
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
SELECT cls.relnamespace, shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid
|
||||||
|
FROM pg_dist_shard AS shard
|
||||||
|
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
||||||
|
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
||||||
|
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
|
||||||
|
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
|
||||||
|
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
||||||
|
relnamespace | shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
18296 | 8981031 | sensors | -2147483648 | -2100000000 | localhost | 57637 | 138
|
||||||
|
18296 | 8981032 | sensors | -2099999999 | -1073741824 | localhost | 57638 | 139
|
||||||
|
18296 | 8981008 | sensors | -1073741823 | -1 | localhost | 57638 | 121
|
||||||
|
18296 | 8981013 | sensors | 0 | 536870911 | localhost | 57637 | 126
|
||||||
|
18296 | 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 | 127
|
||||||
|
18296 | 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 | 128
|
||||||
|
18296 | 8981033 | colocated_dist_table | -2147483648 | -2100000000 | localhost | 57637 | 140
|
||||||
|
18296 | 8981034 | colocated_dist_table | -2099999999 | -1073741824 | localhost | 57638 | 141
|
||||||
|
18296 | 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 | 123
|
||||||
|
18296 | 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 | 129
|
||||||
|
18296 | 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 | 130
|
||||||
|
18296 | 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 | 131
|
||||||
|
18296 | 8981035 | table_with_index_rep_identity | -2147483648 | -2100000000 | localhost | 57637 | 142
|
||||||
|
18296 | 8981036 | table_with_index_rep_identity | -2099999999 | -1073741824 | localhost | 57638 | 143
|
||||||
|
18296 | 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 | 125
|
||||||
|
18296 | 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 | 132
|
||||||
|
18296 | 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 | 133
|
||||||
|
18296 | 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 | 134
|
||||||
|
(18 rows)
|
||||||
|
|
||||||
|
-- END: Split second time on another schema
|
||||||
|
--BEGIN : Cleanup
|
||||||
|
\c - postgres - :master_port
|
||||||
|
DROP SCHEMA "citus_split_test_schema" CASCADE;
|
||||||
|
NOTICE: drop cascades to 4 other objects
|
||||||
|
DETAIL: drop cascades to table citus_split_test_schema.sensors
|
||||||
|
drop cascades to table citus_split_test_schema.reference_table
|
||||||
|
drop cascades to table citus_split_test_schema.colocated_dist_table
|
||||||
|
drop cascades to table citus_split_test_schema.table_with_index_rep_identity
|
||||||
|
--END : Cleanup
|
||||||
|
|
|
@ -4,6 +4,8 @@ test: multi_test_helpers multi_test_helpers_superuser columnar_test_helpers
|
||||||
test: multi_cluster_management
|
test: multi_cluster_management
|
||||||
test: multi_test_catalog_views
|
test: multi_test_catalog_views
|
||||||
test: tablespace
|
test: tablespace
|
||||||
|
# Helpers for foreign key catalogs.
|
||||||
|
test: foreign_key_to_reference_table
|
||||||
# Split tests go here.
|
# Split tests go here.
|
||||||
test: worker_split_binary_copy_test
|
test: worker_split_binary_copy_test
|
||||||
test: worker_split_text_copy_test
|
test: worker_split_text_copy_test
|
||||||
|
|
|
@ -1,123 +1,225 @@
|
||||||
-- Split Shards by Split Points tests.
|
/*
|
||||||
|
Citus Shard Split Test.The test is model similar to 'shard_move_constraints'.
|
||||||
|
Here is a high level overview of test plan:
|
||||||
|
1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table.
|
||||||
|
2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors.
|
||||||
|
3. Create Foreign key constraints between the two co-located distributed tables.
|
||||||
|
4. Load data into the three tables.
|
||||||
|
5. Move one of the shards for 'sensors' to test ShardMove -> Split.
|
||||||
|
6. Trigger Split on both shards of 'sensors'. This will also split co-located tables.
|
||||||
|
7. Move one of the split shard to test Split -> ShardMove.
|
||||||
|
8. Split an already split shard second time on a different schema.
|
||||||
|
*/
|
||||||
|
|
||||||
-- Setup for Test.
|
CREATE SCHEMA "citus_split_test_schema";
|
||||||
CREATE SCHEMA citus_split_shard_by_split_points;
|
SET search_path TO "citus_split_test_schema";
|
||||||
SET search_path TO citus_split_shard_by_split_points;
|
SET citus.next_shard_id TO 8981000;
|
||||||
SET citus.shard_count TO 1;
|
SET citus.next_placement_id TO 8610000;
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.next_shard_id TO 82060000;
|
|
||||||
|
|
||||||
-- Create distributed table 'lineitem_streaming'
|
-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc.
|
||||||
CREATE TABLE lineitem_streaming (
|
CREATE TABLE sensors(
|
||||||
l_orderkey bigint not null,
|
measureid integer,
|
||||||
l_partkey integer not null,
|
eventdatetime date,
|
||||||
l_suppkey integer not null,
|
measure_data jsonb,
|
||||||
l_linenumber integer not null,
|
meaure_quantity decimal(15, 2),
|
||||||
l_quantity decimal(15, 2) not null,
|
measure_status char(1),
|
||||||
l_extendedprice decimal(15, 2) not null,
|
measure_comment varchar(44),
|
||||||
l_discount decimal(15, 2) not null,
|
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||||
l_tax decimal(15, 2) not null,
|
|
||||||
l_returnflag char(1) not null,
|
|
||||||
l_linestatus char(1) not null,
|
|
||||||
l_shipdate date not null,
|
|
||||||
l_commitdate date not null,
|
|
||||||
l_receiptdate date not null,
|
|
||||||
l_shipinstruct char(25) not null,
|
|
||||||
l_shipmode char(10) not null,
|
|
||||||
l_comment varchar(44) not null);
|
|
||||||
SELECT create_distributed_table('lineitem_streaming', 'l_orderkey');
|
|
||||||
|
|
||||||
|
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||||
|
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||||
|
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||||
|
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||||
|
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;
|
||||||
|
|
||||||
-- Create distributed table 'orders_streaming'
|
SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
|
||||||
CREATE TABLE orders_streaming (
|
-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc.
|
||||||
o_orderkey bigint not null primary key,
|
|
||||||
o_custkey integer not null,
|
|
||||||
o_orderstatus char(1) not null,
|
|
||||||
o_totalprice decimal(15,2) not null,
|
|
||||||
o_orderdate date not null,
|
|
||||||
o_orderpriority char(15) not null,
|
|
||||||
o_clerk char(15) not null,
|
|
||||||
o_shippriority integer not null,
|
|
||||||
o_comment varchar(79) not null);
|
|
||||||
SELECT create_distributed_table('orders_streaming', 'o_orderkey');
|
|
||||||
|
|
||||||
-- Insert data into 'lineitem_streaming'
|
-- BEGIN: Create co-located distributed and reference tables.
|
||||||
\COPY lineitem_streaming FROM STDIN WITH DELIMITER '|'
|
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
|
||||||
99|87114|4639|1|10|11011.10|0.02|0.01|A|F|1994-05-18|1994-06-03|1994-05-23|COLLECT COD|RAIL|kages. requ
|
SELECT create_reference_table('reference_table');
|
||||||
99|123766|3767|2|5|8948.80|0.02|0.07|R|F|1994-05-06|1994-05-28|1994-05-20|TAKE BACK RETURN|RAIL|ests cajole fluffily waters. blithe
|
|
||||||
99|134082|1622|3|42|46875.36|0.02|0.02|A|F|1994-04-19|1994-05-18|1994-04-20|NONE|RAIL|kages are fluffily furiously ir
|
|
||||||
99|108338|849|4|36|48467.88|0.09|0.02|A|F|1994-07-04|1994-04-17|1994-07-30|DELIVER IN PERSON|AIR|slyly. slyly e
|
|
||||||
100|62029|2030|1|28|27748.56|0.04|0.05|N|O|1998-05-08|1998-05-13|1998-06-07|COLLECT COD|TRUCK|sts haggle. slowl
|
|
||||||
100|115979|8491|2|22|43889.34|0.00|0.07|N|O|1998-06-24|1998-04-12|1998-06-29|DELIVER IN PERSON|SHIP|nto beans alongside of the fi
|
|
||||||
100|46150|8655|3|46|50422.90|0.03|0.04|N|O|1998-05-02|1998-04-10|1998-05-22|TAKE BACK RETURN|SHIP|ular accounts. even
|
|
||||||
100|38024|3031|4|14|13468.28|0.06|0.03|N|O|1998-05-22|1998-05-01|1998-06-03|COLLECT COD|MAIL|y. furiously ironic ideas gr
|
|
||||||
100|53439|955|5|37|51519.91|0.05|0.00|N|O|1998-03-06|1998-04-16|1998-03-31|TAKE BACK RETURN|TRUCK|nd the quickly s
|
|
||||||
101|118282|5816|1|49|63713.72|0.10|0.00|N|O|1996-06-21|1996-05-27|1996-06-29|DELIVER IN PERSON|REG AIR|ts
|
|
||||||
101|163334|883|2|36|50303.88|0.00|0.01|N|O|1996-05-19|1996-05-01|1996-06-04|DELIVER IN PERSON|AIR|tes. blithely pending dolphins x-ray f
|
|
||||||
101|138418|5958|3|12|17476.92|0.06|0.02|N|O|1996-03-29|1996-04-20|1996-04-12|COLLECT COD|MAIL|. quickly regular
|
|
||||||
102|88914|3931|1|37|70407.67|0.06|0.00|N|O|1997-07-24|1997-08-02|1997-08-07|TAKE BACK RETURN|SHIP|ully across the ideas. final deposit
|
|
||||||
102|169238|6787|2|34|44445.82|0.03|0.08|N|O|1997-08-09|1997-07-28|1997-08-26|TAKE BACK RETURN|SHIP|eposits cajole across
|
|
||||||
102|182321|4840|3|25|35083.00|0.01|0.01|N|O|1997-07-31|1997-07-24|1997-08-17|NONE|RAIL|bits. ironic accoun
|
|
||||||
102|61158|8677|4|15|16787.25|0.07|0.07|N|O|1997-06-02|1997-07-13|1997-06-04|DELIVER IN PERSON|SHIP|final packages. carefully even excu
|
|
||||||
103|194658|2216|1|6|10515.90|0.03|0.05|N|O|1996-10-11|1996-07-25|1996-10-28|NONE|FOB|cajole. carefully ex
|
|
||||||
103|10426|2928|2|37|49447.54|0.02|0.07|N|O|1996-09-17|1996-07-27|1996-09-20|TAKE BACK RETURN|MAIL|ies. quickly ironic requests use blithely
|
|
||||||
103|28431|8432|3|23|31266.89|0.01|0.04|N|O|1996-09-11|1996-09-18|1996-09-26|NONE|FOB|ironic accou
|
|
||||||
103|29022|4027|4|32|30432.64|0.01|0.07|N|O|1996-07-30|1996-08-06|1996-08-04|NONE|RAIL|kages doze. special, regular deposit
|
|
||||||
-1995148554|112942|2943|1|9|17594.46|0.04|0.04|N|O|1996-08-03|1996-05-31|1996-08-04|DELIVER IN PERSON|TRUCK|c realms print carefully car
|
|
||||||
-1686493264|15110|113|5|2|2050.22|0.03|0.08|R|F|1994-04-26|1994-03-15|1994-05-15|TAKE BACK RETURN|MAIL|e final, regular requests. carefully
|
|
||||||
\.
|
|
||||||
|
|
||||||
-- Insert data into 'orders_streaming'
|
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||||
\COPY orders_streaming FROM STDIN WITH DELIMITER '|'
|
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||||
99|890|F|108594.87|1994-03-13|4-NOT SPECIFIED|Clerk#000000973|0|e carefully ironic packages. pending
|
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
|
||||||
100|1471|O|198978.27|1998-02-28|4-NOT SPECIFIED|Clerk#000000577|0|heodolites detect slyly alongside of the ent
|
|
||||||
101|280|O|118448.39|1996-03-17|3-MEDIUM|Clerk#000000419|0|ding accounts above the slyly final asymptote
|
|
||||||
102|8|O|184806.58|1997-05-09|2-HIGH|Clerk#000000596|0| slyly according to the asymptotes. carefully final packages integrate furious
|
|
||||||
103|292|O|118745.16|1996-06-20|4-NOT SPECIFIED|Clerk#000000090|0|ges. carefully unusual instructions haggle quickly regular f
|
|
||||||
-1995148554|142|O|3553.15|1995-05-08|3-MEDIUM|Clerk#000000378|0|nts hinder fluffily ironic instructions. express, express excuses
|
|
||||||
-1686493264|878|O|177809.13|1997-09-05|3-MEDIUM|Clerk#000000379|0|y final packages. final foxes since the quickly even
|
|
||||||
\.
|
|
||||||
|
|
||||||
-- Initial Row Count in Shard
|
CREATE TABLE table_with_index_rep_identity(key int NOT NULL);
|
||||||
SELECT COUNT(*) FROM orders_streaming;
|
CREATE UNIQUE INDEX uqx ON table_with_index_rep_identity(key);
|
||||||
SELECT COUNT(*) FROM lineitem_streaming;
|
ALTER TABLE table_with_index_rep_identity REPLICA IDENTITY USING INDEX uqx;
|
||||||
|
CLUSTER table_with_index_rep_identity USING uqx;
|
||||||
|
SELECT create_distributed_table('table_with_index_rep_identity', 'key', colocate_with:='sensors');
|
||||||
|
-- END: Create co-located distributed and reference tables.
|
||||||
|
|
||||||
-- Create Foreign constraint between two shards.
|
-- BEGIN : Create Foreign key constraints.
|
||||||
ALTER TABLE lineitem_streaming ADD CONSTRAINT test_constraint
|
ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
|
||||||
FOREIGN KEY(l_orderkey) REFERENCES orders_streaming(o_orderkey);
|
-- END : Create Foreign key constraints.
|
||||||
|
|
||||||
-- Before Split, List shard and placement data.
|
-- BEGIN : Load data into tables.
|
||||||
SELECT shard.shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue nodename, nodeport, placementid
|
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||||
|
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||||
|
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
|
||||||
|
-- END: Load data into tables.
|
||||||
|
|
||||||
|
-- BEGIN : Display current state.
|
||||||
|
-- TODO(niupre): Can we refactor this to be a function?
|
||||||
|
SELECT cls.relnamespace, shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid
|
||||||
FROM pg_dist_shard AS shard
|
FROM pg_dist_shard AS shard
|
||||||
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
||||||
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
||||||
WHERE node.noderole = 'primary' AND (logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass)
|
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
|
||||||
|
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
|
||||||
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
||||||
|
|
||||||
-- Trigger five way way Split on Shard.
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
-- END : Display current state
|
||||||
|
|
||||||
|
-- BEGIN : Move one shard before we split it.
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
SET citus.next_shard_id TO 8981007;
|
||||||
|
SET citus.defer_drop_after_shard_move TO OFF;
|
||||||
|
|
||||||
|
SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical');
|
||||||
|
-- END : Move one shard before we split it.
|
||||||
|
|
||||||
|
-- BEGIN : Set node id variables
|
||||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||||
|
-- END : Set node id variables
|
||||||
|
|
||||||
SELECT * FROM citus_split_shard_by_split_points(
|
-- BEGIN : Split two shards : One with move and One without move.
|
||||||
82060000,
|
-- Perform 2 way split
|
||||||
ARRAY['268435455', '536870911', '1073741823', '1610612735'],
|
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node, :worker_2_node, :worker_2_node],
|
8981000,
|
||||||
|
ARRAY['-1073741824'],
|
||||||
|
ARRAY[:worker_1_node, :worker_2_node],
|
||||||
'blocking');
|
'blocking');
|
||||||
|
|
||||||
-- Row Count in Shard after Split (should be same as before)
|
-- Perform 3 way split
|
||||||
SELECT COUNT(*) FROM orders_streaming;
|
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
SELECT COUNT(*) FROM lineitem_streaming;
|
8981001,
|
||||||
|
ARRAY['536870911', '1610612735'],
|
||||||
|
ARRAY[:worker_1_node, :worker_1_node, :worker_2_node],
|
||||||
|
'blocking');
|
||||||
|
-- END : Split two shards : One with move and One without move.
|
||||||
|
|
||||||
-- After Split, List shard and placement data.
|
-- BEGIN : Move a shard post split.
|
||||||
SELECT shard.shardid, logicalrelid, shardstorage, shardminvalue, shardmaxvalue nodename, nodeport, placementid
|
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
|
||||||
|
-- END : Move a shard post split.
|
||||||
|
|
||||||
|
-- BEGIN : Display current state.
|
||||||
|
-- TODO(niupre): Can we refactor this to be a function?
|
||||||
|
SELECT cls.relnamespace, shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid
|
||||||
FROM pg_dist_shard AS shard
|
FROM pg_dist_shard AS shard
|
||||||
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
||||||
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
||||||
WHERE node.noderole = 'primary' AND (logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass)
|
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
|
||||||
|
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
|
||||||
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
||||||
|
|
||||||
-- Cleanup for Test.
|
\c - - - :worker_1_port
|
||||||
\c - - - :master_port
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
SET client_min_messages TO WARNING;
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
DROP SCHEMA citus_split_shard_by_split_points_blocking CASCADE;
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "citus_split_test_schema", public, pg_catalog;
|
||||||
|
SET citus.show_shards_for_app_name_prefixes = '*';
|
||||||
|
SELECT tbl.relname, fk."Constraint", fk."Definition"
|
||||||
|
FROM pg_catalog.pg_class tbl
|
||||||
|
JOIN public.table_fkeys fk on tbl.oid = fk.relid
|
||||||
|
WHERE tbl.relname like 'sensors_%'
|
||||||
|
ORDER BY 1, 2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2;
|
||||||
|
SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2;
|
||||||
|
SELECT stxname FROM pg_statistic_ext
|
||||||
|
WHERE stxnamespace IN (
|
||||||
|
SELECT oid
|
||||||
|
FROM pg_namespace
|
||||||
|
WHERE nspname IN ('citus_split_test_schema')
|
||||||
|
)
|
||||||
|
ORDER BY stxname ASC;
|
||||||
|
-- END : Display current state
|
||||||
|
|
||||||
|
-- BEGIN: Should be able to change/drop constraints
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed;
|
||||||
|
ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200;
|
||||||
|
DROP STATISTICS stats_on_sensors;
|
||||||
|
DROP INDEX index_on_sensors_renamed;
|
||||||
|
ALTER TABLE sensors DROP CONSTRAINT fkey_table_to_dist;
|
||||||
|
-- END: Should be able to change/drop constraints
|
||||||
|
|
||||||
|
-- BEGIN: Split second time on another schema
|
||||||
|
SET search_path TO public;
|
||||||
|
SET citus.next_shard_id TO 8981031;
|
||||||
|
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
|
8981007,
|
||||||
|
ARRAY['-2100000000'],
|
||||||
|
ARRAY[:worker_1_node, :worker_2_node],
|
||||||
|
'blocking');
|
||||||
|
|
||||||
|
SET search_path TO "citus_split_test_schema";
|
||||||
|
SELECT cls.relnamespace, shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid
|
||||||
|
FROM pg_dist_shard AS shard
|
||||||
|
INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid
|
||||||
|
INNER JOIN pg_dist_node node ON placement.groupid = node.groupid
|
||||||
|
INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid
|
||||||
|
WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass)
|
||||||
|
ORDER BY logicalrelid, shardminvalue::BIGINT;
|
||||||
|
-- END: Split second time on another schema
|
||||||
|
|
||||||
|
--BEGIN : Cleanup
|
||||||
|
\c - postgres - :master_port
|
||||||
|
DROP SCHEMA "citus_split_test_schema" CASCADE;
|
||||||
|
--END : Cleanup
|
||||||
|
|
Loading…
Reference in New Issue