From 113d5d66159e040a3f5113b8c490b5e79361277b Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Tue, 31 Aug 2021 16:19:53 +0300 Subject: [PATCH] Adds support for column compression in table distribution --- .../distributed/deparser/citus_ruleutils.c | 11 ++++ src/test/regress/expected/pg14.out | 51 +++++++++++++++++++ src/test/regress/sql/pg14.sql | 20 ++++++++ 3 files changed, 82 insertions(+) diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 86584245b..12bdafeb2 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -22,6 +22,9 @@ #include "access/skey.h" #include "access/stratnum.h" #include "access/sysattr.h" +#if PG_VERSION_NUM >= PG_VERSION_14 +#include "access/toast_compression.h" +#endif #include "access/tupdesc.h" #include "catalog/dependency.h" #include "catalog/indexing.h" @@ -382,6 +385,14 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults, appendStringInfoString(&buffer, " NOT NULL"); } +#if PG_VERSION_NUM >= PG_VERSION_14 + if (CompressionMethodIsValid(attributeForm->attcompression)) + { + appendStringInfo(&buffer, " COMPRESSION %s", + GetCompressionMethodName(attributeForm->attcompression)); + } +#endif + if (attributeForm->attcollation != InvalidOid && attributeForm->attcollation != DEFAULT_COLLATION_OID) { diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index 2c67f0c13..8c820cbab 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -161,5 +161,56 @@ ALTER TABLE par DETACH PARTITION par_2 CONCURRENTLY; ERROR: ALTER TABLE .. DETACH PARTITION .. CONCURRENTLY commands are currently unsupported. ALTER TABLE par DETACH PARTITION par_2 FINALIZE; ERROR: ALTER TABLE .. DETACH PARTITION .. FINALIZE commands are currently unsupported. +-- test column compression propagation in distribution +SET citus.shard_replication_factor TO 1; +CREATE TABLE col_compression (a TEXT COMPRESSION pglz, b TEXT); +SELECT create_distributed_table('col_compression', 'a', shard_count:=4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT attname || ' ' || attcompression AS column_compression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'col\_compression%' AND attnum > 0 ORDER BY 1; + column_compression +--------------------------------------------------------------------- + a p + b +(2 rows) + +SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY( +SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1 +)$$); + column_compression +--------------------------------------------------------------------- + {"a p","a p","b ","b "} + {"a p","a p","b ","b "} +(2 rows) + +-- test column compression propagation in rebalance +SELECT shardid INTO moving_shard FROM citus_shards WHERE table_name='col_compression'::regclass AND nodeport=:worker_1_port LIMIT 1; +SELECT citus_move_shard_placement((SELECT * FROM moving_shard), :'public_worker_1_host', :worker_1_port, :'public_worker_2_host', :worker_2_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT rebalance_table_shards('col_compression', rebalance_strategy := 'by_shard_count'); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards +SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY( +SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1 +)$$) ORDER BY length(result); + column_compression +--------------------------------------------------------------------- + {"a p","a p","b ","b "} + {"a p","a p","b ","b "} +(2 rows) + set client_min_messages to error; drop schema pg14 cascade; diff --git a/src/test/regress/sql/pg14.sql b/src/test/regress/sql/pg14.sql index 141d931e7..12efe99b9 100644 --- a/src/test/regress/sql/pg14.sql +++ b/src/test/regress/sql/pg14.sql @@ -51,6 +51,26 @@ SELECT create_distributed_table('par','a'); ALTER TABLE par DETACH PARTITION par_2 CONCURRENTLY; ALTER TABLE par DETACH PARTITION par_2 FINALIZE; + +-- test column compression propagation in distribution +SET citus.shard_replication_factor TO 1; +CREATE TABLE col_compression (a TEXT COMPRESSION pglz, b TEXT); +SELECT create_distributed_table('col_compression', 'a', shard_count:=4); + +SELECT attname || ' ' || attcompression AS column_compression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'col\_compression%' AND attnum > 0 ORDER BY 1; +SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY( +SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1 +)$$); + +-- test column compression propagation in rebalance +SELECT shardid INTO moving_shard FROM citus_shards WHERE table_name='col_compression'::regclass AND nodeport=:worker_1_port LIMIT 1; +SELECT citus_move_shard_placement((SELECT * FROM moving_shard), :'public_worker_1_host', :worker_1_port, :'public_worker_2_host', :worker_2_port); +SELECT rebalance_table_shards('col_compression', rebalance_strategy := 'by_shard_count'); +CALL citus_cleanup_orphaned_shards(); +SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY( +SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1 +)$$) ORDER BY length(result); + set client_min_messages to error; drop schema pg14 cascade;