mirror of https://github.com/citusdata/citus.git
Dropped columns do not diverge distribution column for partitioned tables
Before this commit, creating a partition after a DROP column on the parent (position before dist. key) was leading to partition to have the wrong distribution column.pull/5131/head
parent
0722ec95bc
commit
35964c6366
|
@ -1178,6 +1178,30 @@ CreateDistributedTableLike(TableConversionState *con)
|
|||
{
|
||||
newShardCount = con->shardCount;
|
||||
}
|
||||
|
||||
Oid originalRelationId = con->relationId;
|
||||
if (con->originalDistributionKey != NULL && PartitionTable(originalRelationId))
|
||||
{
|
||||
/*
|
||||
* Due to dropped columns, the partition tables might have different
|
||||
* distribution keys than their parents, see issue #5123 for details.
|
||||
*
|
||||
* At this point, we get the partitioning information from the
|
||||
* originalRelationId, but we get the distribution key for newRelationId.
|
||||
*
|
||||
* We have to do this, because the newRelationId is just a placeholder
|
||||
* at this moment, but that's going to be the table in pg_dist_partition.
|
||||
*/
|
||||
Oid parentRelationId = PartitionParentOid(originalRelationId);
|
||||
Var *parentDistKey = DistPartitionKey(parentRelationId);
|
||||
char *parentDistKeyColumnName =
|
||||
ColumnToColumnName(parentRelationId, nodeToString(parentDistKey));
|
||||
|
||||
newDistributionKey =
|
||||
FindColumnWithNameOnTargetRelation(parentRelationId, parentDistKeyColumnName,
|
||||
con->newRelationId);
|
||||
}
|
||||
|
||||
char partitionMethod = PartitionMethod(con->relationId);
|
||||
CreateDistributedTable(con->newRelationId, newDistributionKey, partitionMethod,
|
||||
newShardCount, true, newColocateWith, false);
|
||||
|
|
|
@ -440,6 +440,24 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
colocateWithTableName,
|
||||
viaDeprecatedAPI);
|
||||
|
||||
|
||||
/*
|
||||
* Due to dropping columns, the parent's distribution key may not match the
|
||||
* partition's distribution key. The input distributionColumn belongs to
|
||||
* the parent. That's why we override the distribution column of partitions
|
||||
* here. See issue #5123 for details.
|
||||
*/
|
||||
if (PartitionTable(relationId))
|
||||
{
|
||||
Oid parentRelationId = PartitionParentOid(relationId);
|
||||
char *distributionColumnName =
|
||||
ColumnToColumnName(parentRelationId, nodeToString(distributionColumn));
|
||||
|
||||
distributionColumn =
|
||||
FindColumnWithNameOnTargetRelation(parentRelationId, distributionColumnName,
|
||||
relationId);
|
||||
}
|
||||
|
||||
/*
|
||||
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
|
||||
* our caller already acquired lock on relationId.
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "access/htup_details.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/nodes.h"
|
||||
|
@ -115,6 +116,53 @@ column_to_column_name(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindColumnWithNameOnTargetRelation gets a source table and
|
||||
* column name. The function returns the the column with the
|
||||
* same name on the target table.
|
||||
*
|
||||
* Note that due to dropping columns, the parent's distribution key may not
|
||||
* match the partition's distribution key. See issue #5123.
|
||||
*
|
||||
* The function throws error if the input or output is not valid or does
|
||||
* not exist.
|
||||
*/
|
||||
Var *
|
||||
FindColumnWithNameOnTargetRelation(Oid sourceRelationId, char *sourceColumnName,
|
||||
Oid targetRelationId)
|
||||
{
|
||||
if (sourceColumnName == NULL || sourceColumnName[0] == '\0')
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
|
||||
errmsg("cannot find the given column on table \"%s\"",
|
||||
generate_qualified_relation_name(sourceRelationId))));
|
||||
}
|
||||
|
||||
AttrNumber attributeNumberOnTarget = get_attnum(targetRelationId, sourceColumnName);
|
||||
if (attributeNumberOnTarget == InvalidAttrNumber)
|
||||
{
|
||||
ereport(ERROR, (errmsg("Column \"%s\" does not exist on "
|
||||
"relation \"%s\"", sourceColumnName,
|
||||
get_rel_name(targetRelationId))));
|
||||
}
|
||||
|
||||
Index varNo = 1;
|
||||
Oid targetTypeId = InvalidOid;
|
||||
int32 targetTypMod = 0;
|
||||
Oid targetCollation = InvalidOid;
|
||||
Index varlevelsup = 0;
|
||||
|
||||
/* this function throws error in case anything goes wrong */
|
||||
get_atttypetypmodcoll(targetRelationId, attributeNumberOnTarget,
|
||||
&targetTypeId, &targetTypMod, &targetCollation);
|
||||
Var *targetColumn =
|
||||
makeVar(varNo, attributeNumberOnTarget, targetTypeId, targetTypMod,
|
||||
targetCollation, varlevelsup);
|
||||
|
||||
return targetColumn;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildDistributionKeyFromColumnName builds a simple distribution key consisting
|
||||
* only out of a reference to the column of name columnName. Errors out if the
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
|
||||
|
||||
/* Remaining metadata utility functions */
|
||||
extern Var * FindColumnWithNameOnTargetRelation(Oid sourceRelationId,
|
||||
char *sourceColumnName,
|
||||
Oid targetRelationId);
|
||||
extern Var * BuildDistributionKeyFromColumnName(Relation distributedRelation,
|
||||
char *columnName);
|
||||
extern char * ColumnToColumnName(Oid relationId, char *columnNodeString);
|
||||
|
|
|
@ -0,0 +1,428 @@
|
|||
CREATE SCHEMA drop_column_partitioned_table;
|
||||
SET search_path TO drop_column_partitioned_table;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.next_shard_id TO 2580000;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- create a partitioned table with some columns that
|
||||
-- are going to be dropped within the tests
|
||||
CREATE TABLE sensors(
|
||||
col_to_drop_0 text,
|
||||
col_to_drop_1 text,
|
||||
col_to_drop_2 date,
|
||||
col_to_drop_3 inet,
|
||||
col_to_drop_4 date,
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data))
|
||||
PARTITION BY RANGE(eventdatetime);
|
||||
-- drop column even before attaching any partitions
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_1;
|
||||
-- now attach the first partition and create the distributed table
|
||||
CREATE TABLE sensors_2000 PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2001-01-01');
|
||||
SELECT create_distributed_table('sensors', 'measureid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- prepared statements should work fine even after columns are dropped
|
||||
PREPARE drop_col_prepare_insert(int, date, jsonb) AS INSERT INTO sensors (measureid, eventdatetime, measure_data) VALUES ($1, $2, $3);
|
||||
PREPARE drop_col_prepare_select(int, date) AS SELECT count(*) FROM sensors WHERE measureid = $1 AND eventdatetime = $2;
|
||||
-- execute 7 times to make sure it is cached
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-02', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-03', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-04', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-05', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-06', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-07', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-02');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-03');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-04');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-05');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-06');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-07');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- drop another column before attaching another partition
|
||||
-- with .. PARTITION OF .. syntax
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_0;
|
||||
CREATE TABLE sensors_2001 PARTITION OF sensors FOR VALUES FROM ('2001-01-01') TO ('2002-01-01');
|
||||
-- drop another column before attaching another partition
|
||||
-- with ALTER TABLE .. ATTACH PARTITION
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_2;
|
||||
CREATE TABLE sensors_2002(
|
||||
col_to_drop_4 date, col_to_drop_3 inet, measureid integer, eventdatetime date, measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
ALTER TABLE sensors ATTACH PARTITION sensors_2002 FOR VALUES FROM ('2002-01-01') TO ('2003-01-01');
|
||||
-- drop another column before attaching another partition
|
||||
-- that is already distributed
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_3;
|
||||
CREATE TABLE sensors_2003(
|
||||
col_to_drop_4 date, measureid integer, eventdatetime date, measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
SELECT create_distributed_table('sensors_2003', 'measureid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE sensors ATTACH PARTITION sensors_2003 FOR VALUES FROM ('2003-01-01') TO ('2004-01-01');
|
||||
CREATE TABLE sensors_2004(
|
||||
col_to_drop_4 date, measureid integer NOT NULL, eventdatetime date NOT NULL, measure_data jsonb NOT NULL);
|
||||
ALTER TABLE sensors ATTACH PARTITION sensors_2004 FOR VALUES FROM ('2004-01-01') TO ('2005-01-01');
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_4;
|
||||
SELECT alter_table_set_access_method('sensors_2004', 'columnar');
|
||||
NOTICE: creating a new table for drop_column_partitioned_table.sensors_2004
|
||||
NOTICE: moving the data of drop_column_partitioned_table.sensors_2004
|
||||
NOTICE: dropping the old drop_column_partitioned_table.sensors_2004
|
||||
NOTICE: renaming the new table to drop_column_partitioned_table.sensors_2004
|
||||
alter_table_set_access_method
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show that all partitions have the same distribution key
|
||||
SELECT
|
||||
p.logicalrelid::regclass, column_to_column_name(p.logicalrelid, p.partkey)
|
||||
FROM
|
||||
pg_dist_partition p
|
||||
WHERE
|
||||
logicalrelid IN ('sensors'::regclass, 'sensors_2000'::regclass,
|
||||
'sensors_2001'::regclass, 'sensors_2002'::regclass,
|
||||
'sensors_2003'::regclass, 'sensors_2004'::regclass);
|
||||
logicalrelid | column_to_column_name
|
||||
---------------------------------------------------------------------
|
||||
sensors | measureid
|
||||
sensors_2000 | measureid
|
||||
sensors_2001 | measureid
|
||||
sensors_2002 | measureid
|
||||
sensors_2003 | measureid
|
||||
sensors_2004 | measureid
|
||||
(6 rows)
|
||||
|
||||
-- show that all the tables prune to the same shard for the same distribution key
|
||||
WITH
|
||||
sensors_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors', 3)),
|
||||
sensors_2000_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2000', 3)),
|
||||
sensors_2001_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2001', 3)),
|
||||
sensors_2002_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2002', 3)),
|
||||
sensors_2003_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2003', 3)),
|
||||
sensors_2004_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2004', 3)),
|
||||
all_shardids AS (SELECT * FROM sensors_shardid UNION SELECT * FROM sensors_2000_shardid UNION
|
||||
SELECT * FROM sensors_2001_shardid UNION SELECT * FROM sensors_2002_shardid
|
||||
UNION SELECT * FROM sensors_2003_shardid UNION SELECT * FROM sensors_2004_shardid)
|
||||
SELECT logicalrelid, shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid IN (SELECT * FROM all_shardids);
|
||||
logicalrelid | shardid | shardminvalue | shardmaxvalue
|
||||
---------------------------------------------------------------------
|
||||
sensors | 2580001 | -1073741824 | -1
|
||||
sensors_2000 | 2580005 | -1073741824 | -1
|
||||
sensors_2001 | 2580009 | -1073741824 | -1
|
||||
sensors_2002 | 2580013 | -1073741824 | -1
|
||||
sensors_2003 | 2580017 | -1073741824 | -1
|
||||
sensors_2004 | 2580025 | -1073741824 | -1
|
||||
(6 rows)
|
||||
|
||||
VACUUM ANALYZE sensors, sensors_2000, sensors_2001, sensors_2002, sensors_2003;
|
||||
-- show that both INSERT and SELECT can route to a single node when distribution
|
||||
-- key is provided in the query
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors VALUES (3, '2000-02-02', row_to_json(row(1)));
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Insert on sensors_2580001
|
||||
-> Result
|
||||
(7 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2000 VALUES (3, '2000-01-01', row_to_json(row(1)));
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Insert on sensors_2000_2580005
|
||||
-> Result
|
||||
(7 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2001 VALUES (3, '2001-01-01', row_to_json(row(1)));
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Insert on sensors_2001_2580009
|
||||
-> Result
|
||||
(7 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2002 VALUES (3, '2002-01-01', row_to_json(row(1)));
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Insert on sensors_2002_2580013
|
||||
-> Result
|
||||
(7 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2003 VALUES (3, '2003-01-01', row_to_json(row(1)));
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Insert on sensors_2003_2580017
|
||||
-> Result
|
||||
(7 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors WHERE measureid = 3 AND eventdatetime = '2000-02-02';
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Index Only Scan using sensors_2000_pkey_2580005 on sensors_2000_2580005 sensors
|
||||
Index Cond: ((measureid = 3) AND (eventdatetime = '2000-02-02'::date))
|
||||
(8 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2000 WHERE measureid = 3;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2000_2580005 sensors_2000
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2000_pkey_2580005
|
||||
Index Cond: (measureid = 3)
|
||||
(10 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2001 WHERE measureid = 3;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2001_2580009 sensors_2001
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2001_pkey_2580009
|
||||
Index Cond: (measureid = 3)
|
||||
(10 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2002 WHERE measureid = 3;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2002_2580013 sensors_2002
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2002_pkey_2580013
|
||||
Index Cond: (measureid = 3)
|
||||
(10 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2003 WHERE measureid = 3;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Bitmap Heap Scan on sensors_2003_2580017 sensors_2003
|
||||
Recheck Cond: (measureid = 3)
|
||||
-> Bitmap Index Scan on sensors_2003_pkey_2580017
|
||||
Index Cond: (measureid = 3)
|
||||
(10 rows)
|
||||
|
||||
-- execute 7 times to make sure it is re-cached
|
||||
EXECUTE drop_col_prepare_insert(3, '2000-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2001-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2002-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2003-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2003-10-02', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(4, '2003-10-03', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(5, '2003-10-04', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_select(3, '2000-10-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(3, '2001-10-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(3, '2002-10-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(3, '2003-10-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(3, '2003-10-02');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(4, '2003-10-03');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE drop_col_prepare_select(5, '2003-10-04');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- non-fast router planner queries should also work
|
||||
-- so we switched to DEBUG2 to show that dist. key
|
||||
-- and the query is router
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SELECT count(*) FROM (
|
||||
SELECT * FROM sensors WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2000 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2001 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2002 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2003 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2004 WHERE measureid = 3
|
||||
) as foo;
|
||||
DEBUG: pathlist hook for columnar table am
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: query has a single distribution column value: 3
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
-- show that all partitions have the same distribution key
|
||||
-- even after alter_distributed_table changes the shard count
|
||||
-- remove this comment once https://github.com/citusdata/citus/issues/5137 is fixed
|
||||
--SELECT alter_distributed_table('sensors', shard_count:='3');
|
||||
SELECT
|
||||
p.logicalrelid::regclass, column_to_column_name(p.logicalrelid, p.partkey)
|
||||
FROM
|
||||
pg_dist_partition p
|
||||
WHERE
|
||||
logicalrelid IN ('sensors'::regclass, 'sensors_2000'::regclass,
|
||||
'sensors_2001'::regclass, 'sensors_2002'::regclass,
|
||||
'sensors_2003'::regclass, 'sensors_2004'::regclass);
|
||||
logicalrelid | column_to_column_name
|
||||
---------------------------------------------------------------------
|
||||
sensors | measureid
|
||||
sensors_2000 | measureid
|
||||
sensors_2001 | measureid
|
||||
sensors_2002 | measureid
|
||||
sensors_2003 | measureid
|
||||
sensors_2004 | measureid
|
||||
(6 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO drop_column_partitioned_table;
|
||||
SELECT
|
||||
p.logicalrelid::regclass, column_to_column_name(p.logicalrelid, p.partkey)
|
||||
FROM
|
||||
pg_dist_partition p
|
||||
WHERE
|
||||
logicalrelid IN ('sensors'::regclass, 'sensors_2000'::regclass,
|
||||
'sensors_2001'::regclass, 'sensors_2002'::regclass,
|
||||
'sensors_2003'::regclass, 'sensors_2004'::regclass);
|
||||
logicalrelid | column_to_column_name
|
||||
---------------------------------------------------------------------
|
||||
sensors | measureid
|
||||
sensors_2000 | measureid
|
||||
sensors_2001 | measureid
|
||||
sensors_2002 | measureid
|
||||
sensors_2003 | measureid
|
||||
sensors_2004 | measureid
|
||||
(6 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA drop_column_partitioned_table CASCADE;
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
|
@ -52,7 +52,7 @@ test: subquery_prepared_statements
|
|||
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
|
||||
test: cte_inline recursive_view_local_table values
|
||||
test: pg13 pg12
|
||||
test: tableam
|
||||
test: tableam drop_column_partitioned_table
|
||||
|
||||
# ----------
|
||||
# Miscellaneous tests to check our query planning behavior
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
CREATE SCHEMA drop_column_partitioned_table;
|
||||
SET search_path TO drop_column_partitioned_table;
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.next_shard_id TO 2580000;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
||||
-- create a partitioned table with some columns that
|
||||
-- are going to be dropped within the tests
|
||||
CREATE TABLE sensors(
|
||||
col_to_drop_0 text,
|
||||
col_to_drop_1 text,
|
||||
col_to_drop_2 date,
|
||||
col_to_drop_3 inet,
|
||||
col_to_drop_4 date,
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data))
|
||||
PARTITION BY RANGE(eventdatetime);
|
||||
|
||||
-- drop column even before attaching any partitions
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_1;
|
||||
|
||||
-- now attach the first partition and create the distributed table
|
||||
CREATE TABLE sensors_2000 PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2001-01-01');
|
||||
SELECT create_distributed_table('sensors', 'measureid');
|
||||
|
||||
-- prepared statements should work fine even after columns are dropped
|
||||
PREPARE drop_col_prepare_insert(int, date, jsonb) AS INSERT INTO sensors (measureid, eventdatetime, measure_data) VALUES ($1, $2, $3);
|
||||
PREPARE drop_col_prepare_select(int, date) AS SELECT count(*) FROM sensors WHERE measureid = $1 AND eventdatetime = $2;
|
||||
|
||||
-- execute 7 times to make sure it is cached
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-02', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-03', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-04', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-05', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-06', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(1, '2000-10-07', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-01');
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-02');
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-03');
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-04');
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-05');
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-06');
|
||||
EXECUTE drop_col_prepare_select(1, '2000-10-07');
|
||||
|
||||
-- drop another column before attaching another partition
|
||||
-- with .. PARTITION OF .. syntax
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_0;
|
||||
CREATE TABLE sensors_2001 PARTITION OF sensors FOR VALUES FROM ('2001-01-01') TO ('2002-01-01');
|
||||
|
||||
-- drop another column before attaching another partition
|
||||
-- with ALTER TABLE .. ATTACH PARTITION
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_2;
|
||||
|
||||
CREATE TABLE sensors_2002(
|
||||
col_to_drop_4 date, col_to_drop_3 inet, measureid integer, eventdatetime date, measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
ALTER TABLE sensors ATTACH PARTITION sensors_2002 FOR VALUES FROM ('2002-01-01') TO ('2003-01-01');
|
||||
|
||||
-- drop another column before attaching another partition
|
||||
-- that is already distributed
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_3;
|
||||
|
||||
CREATE TABLE sensors_2003(
|
||||
col_to_drop_4 date, measureid integer, eventdatetime date, measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
SELECT create_distributed_table('sensors_2003', 'measureid');
|
||||
ALTER TABLE sensors ATTACH PARTITION sensors_2003 FOR VALUES FROM ('2003-01-01') TO ('2004-01-01');
|
||||
|
||||
CREATE TABLE sensors_2004(
|
||||
col_to_drop_4 date, measureid integer NOT NULL, eventdatetime date NOT NULL, measure_data jsonb NOT NULL);
|
||||
|
||||
ALTER TABLE sensors ATTACH PARTITION sensors_2004 FOR VALUES FROM ('2004-01-01') TO ('2005-01-01');
|
||||
ALTER TABLE sensors DROP COLUMN col_to_drop_4;
|
||||
SELECT alter_table_set_access_method('sensors_2004', 'columnar');
|
||||
|
||||
-- show that all partitions have the same distribution key
|
||||
SELECT
|
||||
p.logicalrelid::regclass, column_to_column_name(p.logicalrelid, p.partkey)
|
||||
FROM
|
||||
pg_dist_partition p
|
||||
WHERE
|
||||
logicalrelid IN ('sensors'::regclass, 'sensors_2000'::regclass,
|
||||
'sensors_2001'::regclass, 'sensors_2002'::regclass,
|
||||
'sensors_2003'::regclass, 'sensors_2004'::regclass);
|
||||
|
||||
-- show that all the tables prune to the same shard for the same distribution key
|
||||
WITH
|
||||
sensors_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors', 3)),
|
||||
sensors_2000_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2000', 3)),
|
||||
sensors_2001_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2001', 3)),
|
||||
sensors_2002_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2002', 3)),
|
||||
sensors_2003_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2003', 3)),
|
||||
sensors_2004_shardid AS (SELECT * FROM get_shard_id_for_distribution_column('sensors_2004', 3)),
|
||||
all_shardids AS (SELECT * FROM sensors_shardid UNION SELECT * FROM sensors_2000_shardid UNION
|
||||
SELECT * FROM sensors_2001_shardid UNION SELECT * FROM sensors_2002_shardid
|
||||
UNION SELECT * FROM sensors_2003_shardid UNION SELECT * FROM sensors_2004_shardid)
|
||||
SELECT logicalrelid, shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid IN (SELECT * FROM all_shardids);
|
||||
|
||||
VACUUM ANALYZE sensors, sensors_2000, sensors_2001, sensors_2002, sensors_2003;
|
||||
|
||||
-- show that both INSERT and SELECT can route to a single node when distribution
|
||||
-- key is provided in the query
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors VALUES (3, '2000-02-02', row_to_json(row(1)));
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2000 VALUES (3, '2000-01-01', row_to_json(row(1)));
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2001 VALUES (3, '2001-01-01', row_to_json(row(1)));
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2002 VALUES (3, '2002-01-01', row_to_json(row(1)));
|
||||
EXPLAIN (COSTS FALSE) INSERT INTO sensors_2003 VALUES (3, '2003-01-01', row_to_json(row(1)));
|
||||
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors WHERE measureid = 3 AND eventdatetime = '2000-02-02';
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2000 WHERE measureid = 3;
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2001 WHERE measureid = 3;
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2002 WHERE measureid = 3;
|
||||
EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2003 WHERE measureid = 3;
|
||||
|
||||
-- execute 7 times to make sure it is re-cached
|
||||
EXECUTE drop_col_prepare_insert(3, '2000-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2001-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2002-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2003-10-01', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(3, '2003-10-02', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(4, '2003-10-03', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_insert(5, '2003-10-04', row_to_json(row(1)));
|
||||
EXECUTE drop_col_prepare_select(3, '2000-10-01');
|
||||
EXECUTE drop_col_prepare_select(3, '2001-10-01');
|
||||
EXECUTE drop_col_prepare_select(3, '2002-10-01');
|
||||
EXECUTE drop_col_prepare_select(3, '2003-10-01');
|
||||
EXECUTE drop_col_prepare_select(3, '2003-10-02');
|
||||
EXECUTE drop_col_prepare_select(4, '2003-10-03');
|
||||
EXECUTE drop_col_prepare_select(5, '2003-10-04');
|
||||
|
||||
-- non-fast router planner queries should also work
|
||||
-- so we switched to DEBUG2 to show that dist. key
|
||||
-- and the query is router
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SELECT count(*) FROM (
|
||||
SELECT * FROM sensors WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2000 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2001 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2002 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2003 WHERE measureid = 3
|
||||
UNION
|
||||
SELECT * FROM sensors_2004 WHERE measureid = 3
|
||||
) as foo;
|
||||
|
||||
RESET client_min_messages;
|
||||
|
||||
-- show that all partitions have the same distribution key
|
||||
-- even after alter_distributed_table changes the shard count
|
||||
|
||||
-- remove this comment once https://github.com/citusdata/citus/issues/5137 is fixed
|
||||
--SELECT alter_distributed_table('sensors', shard_count:='3');
|
||||
|
||||
SELECT
|
||||
p.logicalrelid::regclass, column_to_column_name(p.logicalrelid, p.partkey)
|
||||
FROM
|
||||
pg_dist_partition p
|
||||
WHERE
|
||||
logicalrelid IN ('sensors'::regclass, 'sensors_2000'::regclass,
|
||||
'sensors_2001'::regclass, 'sensors_2002'::regclass,
|
||||
'sensors_2003'::regclass, 'sensors_2004'::regclass);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO drop_column_partitioned_table;
|
||||
SELECT
|
||||
p.logicalrelid::regclass, column_to_column_name(p.logicalrelid, p.partkey)
|
||||
FROM
|
||||
pg_dist_partition p
|
||||
WHERE
|
||||
logicalrelid IN ('sensors'::regclass, 'sensors_2000'::regclass,
|
||||
'sensors_2001'::regclass, 'sensors_2002'::regclass,
|
||||
'sensors_2003'::regclass, 'sensors_2004'::regclass);
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA drop_column_partitioned_table CASCADE;
|
||||
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
Loading…
Reference in New Issue