Columnar: chunk_group metadata table

pull/4659/head
Hadi Moshayedi 2021-02-07 11:52:00 -08:00
parent c2480343c7
commit c8d61a31e2
21 changed files with 282 additions and 6 deletions

View File

@ -86,6 +86,7 @@ static Oid ColumnarStripeIndexRelationId(void);
static Oid ColumnarOptionsRelationId(void);
static Oid ColumnarOptionsIndexRegclass(void);
static Oid ColumnarChunkRelationId(void);
static Oid ColumnarChunkGroupRelationId(void);
static Oid ColumnarChunkIndexRelationId(void);
static Oid ColumnarNamespaceId(void);
static ModifyState * StartModifyRelation(Relation rel);
@ -139,6 +140,13 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_stripe_chunk_row_count 7
#define Anum_columnar_stripe_row_count 8
/* constants for columnar.chunk_group */
#define Natts_columnar_chunkgroup 4
#define Anum_columnar_chunkgroup_storageid 1
#define Anum_columnar_chunkgroup_stripe 2
#define Anum_columnar_chunkgroup_chunk 3
#define Anum_columnar_chunkgroup_row_count 4
/* constants for columnar.chunk */
#define Natts_columnar_chunk 14
#define Anum_columnar_chunk_storageid 1
@ -462,6 +470,44 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk
}
/*
* SaveChunkGroups saves the metadata for given chunk groups in columnar.chunk_group.
*/
void
SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
List *chunkGroupRowCounts)
{
ColumnarMetapage *metapage = ReadMetapage(relfilenode, false);
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(columnarChunkGroup);
ListCell *lc = NULL;
int chunkId = 0;
foreach(lc, chunkGroupRowCounts)
{
int64 rowCount = lfirst_int(lc);
Datum values[Natts_columnar_chunkgroup] = {
UInt64GetDatum(metapage->storageId),
Int64GetDatum(stripe),
Int32GetDatum(chunkId),
Int64GetDatum(rowCount)
};
bool nulls[Natts_columnar_chunkgroup] = { false };
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
chunkId++;
}
FinishModifyRelation(modifyState);
table_close(columnarChunkGroup, NoLock);
CommandCounterIncrement();
}
/*
* ReadStripeSkipList fetches chunk metadata for a given stripe.
*/
@ -1113,6 +1159,17 @@ ColumnarChunkRelationId(void)
}
/*
* ColumnarChunkGroupRelationId returns relation id of columnar.chunk_group.
* TODO: should we cache this similar to citus?
*/
static Oid
ColumnarChunkGroupRelationId(void)
{
return get_relname_relid("chunk_group", ColumnarNamespaceId());
}
/*
* ColumnarChunkIndexRelationId returns relation id of columnar.chunk_pkey.
* TODO: should we cache this similar to citus?

View File

@ -45,6 +45,8 @@ struct TableWriteState
ColumnarOptions options;
ChunkData *chunkData;
List *chunkGroupRowCounts;
/*
* compressionBuffer buffer is used as temporary storage during
* data value compression operation. It is kept here to minimize
@ -548,11 +550,15 @@ FlushStripe(TableWriteState *writeState)
}
}
/* create skip list and footer buffers */
SaveChunkGroups(writeState->relfilenode,
stripeMetadata.id,
writeState->chunkGroupRowCounts);
SaveStripeSkipList(writeState->relfilenode,
stripeMetadata.id,
stripeSkipList, tupleDescriptor);
writeState->chunkGroupRowCounts = NIL;
relation_close(relation, NoLock);
}
@ -640,6 +646,9 @@ SerializeChunkData(TableWriteState *writeState, uint32 chunkIndex, uint32 rowCou
const uint32 columnCount = stripeBuffers->columnCount;
StringInfo compressionBuffer = writeState->compressionBuffer;
writeState->chunkGroupRowCounts =
lappend_int(writeState->chunkGroupRowCounts, rowCount);
/* serialize exist values, data values are already serialized */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{

View File

@ -29,6 +29,17 @@ CREATE TABLE stripe (
COMMENT ON TABLE stripe IS 'Columnar per stripe metadata';
CREATE TABLE chunk_group (
storageid bigint NOT NULL,
stripeid bigint NOT NULL,
chunkid int NOT NULL,
row_count bigint NOT NULL,
PRIMARY KEY (storageid, stripeid, chunkid),
FOREIGN KEY (storageid, stripeid) REFERENCES stripe(storageid, stripeid) ON DELETE CASCADE
);
COMMENT ON TABLE chunk_group IS 'Columnar chunk group metadata';
CREATE TABLE chunk (
storageid bigint NOT NULL,
stripeid bigint NOT NULL,
@ -45,7 +56,7 @@ CREATE TABLE chunk (
value_compression_level int NOT NULL,
value_decompressed_length bigint NOT NULL,
PRIMARY KEY (storageid, stripeid, attnum, chunkid),
FOREIGN KEY (storageid, stripeid) REFERENCES stripe(storageid, stripeid) ON DELETE CASCADE
FOREIGN KEY (storageid, stripeid, chunkid) REFERENCES chunk_group(storageid, stripeid, chunkid) ON DELETE CASCADE
) WITH (user_catalog_table = true);
COMMENT ON TABLE chunk IS 'Columnar per chunk metadata';

View File

@ -30,6 +30,7 @@ END IF;
END$proc$;
DROP TABLE chunk;
DROP TABLE chunk_group;
DROP TABLE stripe;
DROP TABLE options;
DROP SEQUENCE storageid_seq;

View File

@ -287,6 +287,8 @@ extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
StripeSkipList *stripeSkipList,
TupleDesc tupleDescriptor);
extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
List *chunkGroupRowCounts);
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
TupleDesc tupleDescriptor,
uint32 chunkCount);

View File

@ -42,3 +42,18 @@ EXCEPTION WHEN invalid_parameter_value THEN
return false;
END;
$$ LANGUAGE plpgsql;
-- are chunk groups and chunks consistent?
CREATE view chunk_group_consistency AS
WITH a as (
SELECT storageid, stripeid, chunkid, min(value_count) as row_count
FROM columnar.chunk
GROUP BY 1,2,3
), b as (
SELECT storageid, stripeid, chunkid, max(value_count) as row_count
FROM columnar.chunk
GROUP BY 1,2,3
), c as (
(TABLE a EXCEPT TABLE b) UNION (TABLE b EXCEPT TABLE a) UNION
(TABLE a EXCEPT TABLE columnar.chunk_group) UNION (TABLE columnar.chunk_group EXCEPT TABLE a)
)
SELECT count(*) = 0 AS consistent FROM c;

View File

@ -45,6 +45,12 @@ select count(*) from test_insert_command;
3
(1 row)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
drop table test_insert_command_data;
drop table test_insert_command;
-- test long attribute value insertion
@ -61,6 +67,12 @@ CREATE TABLE test_cstore_long_text(int_val int, text_val text)
USING columnar;
-- store long text in cstore table
INSERT INTO test_cstore_long_text SELECT * FROM test_long_text;
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
-- drop source table to remove original text from toast
DROP TABLE test_long_text;
-- check if text data is still available in cstore table
@ -129,5 +141,11 @@ FROM test_toast_columnar;
5004 | 5004 | 5004 | 5004
(1 row)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
DROP TABLE test_toast_row;
DROP TABLE test_toast_columnar;

View File

@ -72,6 +72,12 @@ SELECT * FROM t2;
5 | 6
(5 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
TRUNCATE t1;
TRUNCATE t2;
--
@ -101,6 +107,12 @@ SELECT * FROM t2;
3 | 0
(3 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
TRUNCATE t1;
TRUNCATE t2;
--
@ -125,6 +137,12 @@ SELECT * FROM t1 ORDER BY a, b;
5 | 10
(10 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
TRUNCATE t1;
TRUNCATE t2;
--
@ -165,6 +183,12 @@ SELECT * FROM t2 ORDER BY a, b;
5 | 5
(5 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
TRUNCATE t1, t2, t3, t4;
--
-- INSERT into the same relation that was INSERTed into in the UDF
@ -204,6 +228,12 @@ SELECT * FROM t3 ORDER BY a, b;
---------------------------------------------------------------------
(0 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
DROP FUNCTION g(int), g2(int);
TRUNCATE t1, t2, t3, t4;
--
@ -248,5 +278,11 @@ SELECT * FROM t1 ORDER BY a, b;
23 | 46
(10 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
DROP FUNCTION f(int);
DROP TABLE t1, t2, t3, t4;

View File

@ -37,7 +37,19 @@ SELECT * FROM columnar_truncate_test;
10 | 10
(10 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
TRUNCATE TABLE columnar_truncate_test;
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM columnar_truncate_test;
a | b
---------------------------------------------------------------------
@ -118,12 +130,24 @@ SELECT * from columnar_truncate_test_regular;
20 | 20
(11 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
-- make sure multi truncate works
-- notice that the same table might be repeated
TRUNCATE TABLE columnar_truncate_test,
columnar_truncate_test_regular,
columnar_truncate_test_second,
columnar_truncate_test;
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
SELECT * from columnar_truncate_test;
a | b
---------------------------------------------------------------------
@ -267,6 +291,12 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
(1 row)
\c - :current_user
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
-- cleanup
DROP SCHEMA truncate_schema CASCADE;
NOTICE: drop cascades to table truncate_schema.truncate_tbl

View File

@ -27,6 +27,12 @@ SELECT count(*) FROM t_stripes;
-- vacuum full should merge stripes together
VACUUM FULL t;
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
SELECT sum(a), sum(b) FROM t;
sum | sum
---------------------------------------------------------------------
@ -60,6 +66,12 @@ SELECT count(*) FROM t_stripes;
(1 row)
VACUUM FULL t;
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
SELECT sum(a), sum(b) FROM t;
sum | sum
---------------------------------------------------------------------
@ -203,6 +215,12 @@ compression rate: 1.25x
total row count: 5530, stripe count: 5, average rows per stripe: 1106
chunk count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM t;
count
---------------------------------------------------------------------
@ -239,6 +257,12 @@ compression rate: 1.96x
total row count: 7030, stripe count: 4, average rows per stripe: 1757
chunk count: 8, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 6
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
DROP TABLE t;
DROP VIEW t_stripes;
-- Make sure we cleaned the metadata for t too
@ -262,4 +286,10 @@ compression rate: 33.71x
total row count: 1000000, stripe count: 1, average rows per stripe: 1000000
chunk count: 30, containing data for dropped columns: 0, pglz compressed: 30
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
DROP TABLE t;

View File

@ -507,12 +507,13 @@ SELECT * FROM print_extension_changes();
| schema columnar
| sequence columnar.storageid_seq
| table columnar.chunk
| table columnar.chunk_group
| table columnar.options
| table columnar.stripe
| view citus_shards
| view citus_tables
| view time_partitions
(66 rows)
(67 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -503,12 +503,13 @@ SELECT * FROM print_extension_changes();
| schema columnar
| sequence columnar.storageid_seq
| table columnar.chunk
| table columnar.chunk_group
| table columnar.options
| table columnar.stripe
| view citus_shards
| view citus_tables
| view time_partitions
(62 rows)
(63 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -214,6 +214,7 @@ ORDER BY 1;
sequence pg_dist_shardid_seq
table citus.pg_dist_object
table columnar.chunk
table columnar.chunk_group
table columnar.options
table columnar.stripe
table pg_dist_authinfo
@ -241,5 +242,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(225 rows)
(226 rows)

View File

@ -210,6 +210,7 @@ ORDER BY 1;
sequence pg_dist_shardid_seq
table citus.pg_dist_object
table columnar.chunk
table columnar.chunk_group
table columnar.options
table columnar.stripe
table pg_dist_authinfo
@ -237,5 +238,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(221 rows)
(222 rows)

View File

@ -43,4 +43,6 @@ speed of light,2.997e8
SELECT * FROM famous_constants ORDER BY id, name;
SELECT * FROM chunk_group_consistency;
DROP TABLE famous_constants;

View File

@ -39,4 +39,10 @@ SELECT * FROM famous_constants ORDER BY id, name;
| speed of light | 2.997e+08
(8 rows)
SELECT * FROM chunk_group_consistency;
consistent
---------------------------------------------------------------------
t
(1 row)
DROP TABLE famous_constants;

View File

@ -44,3 +44,20 @@ EXCEPTION WHEN invalid_parameter_value THEN
return false;
END;
$$ LANGUAGE plpgsql;
-- are chunk groups and chunks consistent?
CREATE view chunk_group_consistency AS
WITH a as (
SELECT storageid, stripeid, chunkid, min(value_count) as row_count
FROM columnar.chunk
GROUP BY 1,2,3
), b as (
SELECT storageid, stripeid, chunkid, max(value_count) as row_count
FROM columnar.chunk
GROUP BY 1,2,3
), c as (
(TABLE a EXCEPT TABLE b) UNION (TABLE b EXCEPT TABLE a) UNION
(TABLE a EXCEPT TABLE columnar.chunk_group) UNION (TABLE columnar.chunk_group EXCEPT TABLE a)
)
SELECT count(*) = 0 AS consistent FROM c;

View File

@ -22,6 +22,8 @@ select count(*) from test_insert_command_data;
insert into test_insert_command select * from test_insert_command_data;
select count(*) from test_insert_command;
SELECT * FROM chunk_group_consistency;
drop table test_insert_command_data;
drop table test_insert_command;
@ -43,6 +45,8 @@ USING columnar;
-- store long text in cstore table
INSERT INTO test_cstore_long_text SELECT * FROM test_long_text;
SELECT * FROM chunk_group_consistency;
-- drop source table to remove original text from toast
DROP TABLE test_long_text;
@ -95,5 +99,7 @@ SELECT
pg_column_size(external), pg_column_size(extended)
FROM test_toast_columnar;
SELECT * FROM chunk_group_consistency;
DROP TABLE test_toast_row;
DROP TABLE test_toast_columnar;

View File

@ -39,6 +39,8 @@ INSERT INTO t2 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1;
SELECT * FROM t2;
SELECT * FROM chunk_group_consistency;
TRUNCATE t1;
TRUNCATE t2;
@ -55,6 +57,8 @@ INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i;
SELECT * FROM t1;
SELECT * FROM t2;
SELECT * FROM chunk_group_consistency;
TRUNCATE t1;
TRUNCATE t2;
@ -68,6 +72,8 @@ INSERT INTO t1 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1 ORDER BY a, b;
SELECT * FROM chunk_group_consistency;
TRUNCATE t1;
TRUNCATE t2;
@ -99,6 +105,8 @@ INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i;
SELECT * FROM t2 ORDER BY a, b;
SELECT * FROM chunk_group_consistency;
TRUNCATE t1, t2, t3, t4;
--
@ -113,6 +121,8 @@ SELECT * FROM t3 ORDER BY a, b;
((table t1) except (table t3)) union ((table t3) except (table t1));
((table t2) except (table t4)) union ((table t4) except (table t2));
SELECT * FROM chunk_group_consistency;
DROP FUNCTION g(int), g2(int);
TRUNCATE t1, t2, t3, t4;
@ -138,6 +148,8 @@ SELECT f(0), f(20);
SELECT * FROM t1 ORDER BY a, b;
SELECT * FROM chunk_group_consistency;
DROP FUNCTION f(int);
DROP TABLE t1, t2, t3, t4;

View File

@ -25,8 +25,12 @@ set columnar.compression to default;
-- query rows
SELECT * FROM columnar_truncate_test;
SELECT * FROM chunk_group_consistency;
TRUNCATE TABLE columnar_truncate_test;
SELECT * FROM chunk_group_consistency;
SELECT * FROM columnar_truncate_test;
SELECT COUNT(*) from columnar_truncate_test;
@ -47,6 +51,8 @@ SELECT * from columnar_truncate_test_second;
SELECT * from columnar_truncate_test_regular;
SELECT * FROM chunk_group_consistency;
-- make sure multi truncate works
-- notice that the same table might be repeated
TRUNCATE TABLE columnar_truncate_test,
@ -54,6 +60,8 @@ TRUNCATE TABLE columnar_truncate_test,
columnar_truncate_test_second,
columnar_truncate_test;
SELECT * FROM chunk_group_consistency;
SELECT * from columnar_truncate_test;
SELECT * from columnar_truncate_test_second;
SELECT * from columnar_truncate_test_regular;
@ -136,6 +144,8 @@ TRUNCATE TABLE truncate_schema.truncate_tbl;
SELECT count(*) FROM truncate_schema.truncate_tbl;
\c - :current_user
SELECT * FROM chunk_group_consistency;
-- cleanup
DROP SCHEMA truncate_schema CASCADE;
DROP USER truncate_user;

View File

@ -20,6 +20,8 @@ SELECT count(*) FROM t_stripes;
-- vacuum full should merge stripes together
VACUUM FULL t;
SELECT * FROM chunk_group_consistency;
SELECT sum(a), sum(b) FROM t;
SELECT count(*) FROM t_stripes;
@ -32,6 +34,8 @@ SELECT count(*) FROM t_stripes;
VACUUM FULL t;
SELECT * FROM chunk_group_consistency;
SELECT sum(a), sum(b) FROM t;
SELECT count(*) FROM t_stripes;
@ -92,6 +96,8 @@ COMMIT;
VACUUM VERBOSE t;
SELECT * FROM chunk_group_consistency;
SELECT count(*) FROM t;
-- check that we report chunks with data for dropped columns
@ -108,6 +114,8 @@ SELECT alter_columnar_table_set('t', compression => 'pglz');
VACUUM FULL t;
VACUUM VERBOSE t;
SELECT * FROM chunk_group_consistency;
DROP TABLE t;
DROP VIEW t_stripes;
@ -123,4 +131,6 @@ INSERT INTO t SELECT 1, 'a', 'xyz' FROM generate_series(1, 1000000) i;
VACUUM VERBOSE t;
SELECT * FROM chunk_group_consistency;
DROP TABLE t;