Adds support for column compression in table distribution

pull/5209/head
Halil Ozan Akgul 2021-08-31 16:19:53 +03:00 committed by Sait Talha Nisanci
parent 6fbdeb38a8
commit 113d5d6615
3 changed files with 82 additions and 0 deletions

View File

@ -22,6 +22,9 @@
#include "access/skey.h" #include "access/skey.h"
#include "access/stratnum.h" #include "access/stratnum.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#if PG_VERSION_NUM >= PG_VERSION_14
#include "access/toast_compression.h"
#endif
#include "access/tupdesc.h" #include "access/tupdesc.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
@ -382,6 +385,14 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
appendStringInfoString(&buffer, " NOT NULL"); appendStringInfoString(&buffer, " NOT NULL");
} }
#if PG_VERSION_NUM >= PG_VERSION_14
if (CompressionMethodIsValid(attributeForm->attcompression))
{
appendStringInfo(&buffer, " COMPRESSION %s",
GetCompressionMethodName(attributeForm->attcompression));
}
#endif
if (attributeForm->attcollation != InvalidOid && if (attributeForm->attcollation != InvalidOid &&
attributeForm->attcollation != DEFAULT_COLLATION_OID) attributeForm->attcollation != DEFAULT_COLLATION_OID)
{ {

View File

@ -161,5 +161,56 @@ ALTER TABLE par DETACH PARTITION par_2 CONCURRENTLY;
ERROR: ALTER TABLE .. DETACH PARTITION .. CONCURRENTLY commands are currently unsupported. ERROR: ALTER TABLE .. DETACH PARTITION .. CONCURRENTLY commands are currently unsupported.
ALTER TABLE par DETACH PARTITION par_2 FINALIZE; ALTER TABLE par DETACH PARTITION par_2 FINALIZE;
ERROR: ALTER TABLE .. DETACH PARTITION .. FINALIZE commands are currently unsupported. ERROR: ALTER TABLE .. DETACH PARTITION .. FINALIZE commands are currently unsupported.
-- test column compression propagation in distribution
SET citus.shard_replication_factor TO 1;
CREATE TABLE col_compression (a TEXT COMPRESSION pglz, b TEXT);
SELECT create_distributed_table('col_compression', 'a', shard_count:=4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT attname || ' ' || attcompression AS column_compression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'col\_compression%' AND attnum > 0 ORDER BY 1;
column_compression
---------------------------------------------------------------------
a p
b
(2 rows)
SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY(
SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1
)$$);
column_compression
---------------------------------------------------------------------
{"a p","a p","b ","b "}
{"a p","a p","b ","b "}
(2 rows)
-- test column compression propagation in rebalance
SELECT shardid INTO moving_shard FROM citus_shards WHERE table_name='col_compression'::regclass AND nodeport=:worker_1_port LIMIT 1;
SELECT citus_move_shard_placement((SELECT * FROM moving_shard), :'public_worker_1_host', :worker_1_port, :'public_worker_2_host', :worker_2_port);
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT rebalance_table_shards('col_compression', rebalance_strategy := 'by_shard_count');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards
SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY(
SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1
)$$) ORDER BY length(result);
column_compression
---------------------------------------------------------------------
{"a p","a p","b ","b "}
{"a p","a p","b ","b "}
(2 rows)
set client_min_messages to error; set client_min_messages to error;
drop schema pg14 cascade; drop schema pg14 cascade;

View File

@ -51,6 +51,26 @@ SELECT create_distributed_table('par','a');
ALTER TABLE par DETACH PARTITION par_2 CONCURRENTLY; ALTER TABLE par DETACH PARTITION par_2 CONCURRENTLY;
ALTER TABLE par DETACH PARTITION par_2 FINALIZE; ALTER TABLE par DETACH PARTITION par_2 FINALIZE;
-- test column compression propagation in distribution
SET citus.shard_replication_factor TO 1;
CREATE TABLE col_compression (a TEXT COMPRESSION pglz, b TEXT);
SELECT create_distributed_table('col_compression', 'a', shard_count:=4);
SELECT attname || ' ' || attcompression AS column_compression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'col\_compression%' AND attnum > 0 ORDER BY 1;
SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY(
SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1
)$$);
-- test column compression propagation in rebalance
SELECT shardid INTO moving_shard FROM citus_shards WHERE table_name='col_compression'::regclass AND nodeport=:worker_1_port LIMIT 1;
SELECT citus_move_shard_placement((SELECT * FROM moving_shard), :'public_worker_1_host', :worker_1_port, :'public_worker_2_host', :worker_2_port);
SELECT rebalance_table_shards('col_compression', rebalance_strategy := 'by_shard_count');
CALL citus_cleanup_orphaned_shards();
SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY(
SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1
)$$) ORDER BY length(result);
set client_min_messages to error; set client_min_messages to error;
drop schema pg14 cascade; drop schema pg14 cascade;