mirror of https://github.com/citusdata/citus.git
commit
c4eb36dfd2
2
Makefile
2
Makefile
|
@ -53,7 +53,7 @@ ifeq ($(USE_TABLEAM),yes)
|
|||
PG_CFLAGS += -DUSE_TABLEAM
|
||||
OBJS += cstore_tableam.o
|
||||
REGRESS += am_create am_load am_query am_analyze am_data_types am_functions \
|
||||
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_clean
|
||||
am_drop am_insert am_copyto am_alter am_rollback am_truncate am_vacuum am_clean
|
||||
endif
|
||||
|
||||
ifeq ($(enable_coverage),yes)
|
||||
|
|
104
cstore_tableam.c
104
cstore_tableam.c
|
@ -19,6 +19,7 @@
|
|||
#include "catalog/storage.h"
|
||||
#include "catalog/storage_xlog.h"
|
||||
#include "commands/progress.h"
|
||||
#include "commands/vacuum.h"
|
||||
#include "executor/executor.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "optimizer/plancat.h"
|
||||
|
@ -131,6 +132,36 @@ cstore_free_write_state()
|
|||
}
|
||||
|
||||
|
||||
static List *
|
||||
RelationColumnList(Relation rel)
|
||||
{
|
||||
List *columnList = NIL;
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
|
||||
for (int i = 0; i < tupdesc->natts; i++)
|
||||
{
|
||||
Index varno = 0;
|
||||
AttrNumber varattno = i + 1;
|
||||
Oid vartype = tupdesc->attrs[i].atttypid;
|
||||
int32 vartypmod = tupdesc->attrs[i].atttypmod;
|
||||
Oid varcollid = tupdesc->attrs[i].attcollation;
|
||||
Index varlevelsup = 0;
|
||||
Var *var;
|
||||
|
||||
if (tupdesc->attrs[i].attisdropped)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var = makeVar(varno, varattno, vartype, vartypmod,
|
||||
varcollid, varlevelsup);
|
||||
columnList = lappend(columnList, var);
|
||||
}
|
||||
|
||||
return columnList;
|
||||
}
|
||||
|
||||
|
||||
static const TupleTableSlotOps *
|
||||
cstore_slot_callbacks(Relation relation)
|
||||
{
|
||||
|
@ -157,25 +188,7 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
scan->cs_base.rs_flags = flags;
|
||||
scan->cs_base.rs_parallel = parallel_scan;
|
||||
|
||||
for (int i = 0; i < tupdesc->natts; i++)
|
||||
{
|
||||
Index varno = 0;
|
||||
AttrNumber varattno = i + 1;
|
||||
Oid vartype = tupdesc->attrs[i].atttypid;
|
||||
int32 vartypmod = 0;
|
||||
Oid varcollid = 0;
|
||||
Index varlevelsup = 0;
|
||||
Var *var;
|
||||
|
||||
if (tupdesc->attrs[i].attisdropped)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var = makeVar(varno, varattno, vartype, vartypmod,
|
||||
varcollid, varlevelsup);
|
||||
columnList = lappend(columnList, var);
|
||||
}
|
||||
columnList = RelationColumnList(relation);
|
||||
|
||||
readState = CStoreBeginRead(relation, tupdesc, columnList, NULL);
|
||||
|
||||
|
@ -497,6 +510,13 @@ cstore_relation_copy_data(Relation rel, const RelFileNode *newrnode)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* cstore_relation_copy_for_cluster is called on VACUUM FULL, at which
|
||||
* we should copy data from OldHeap to NewHeap.
|
||||
*
|
||||
* In general TableAM case this can also be called for the CLUSTER command
|
||||
* which is not applicable for cstore since it doesn't support indexes.
|
||||
*/
|
||||
static void
|
||||
cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
||||
Relation OldIndex, bool use_sort,
|
||||
|
@ -507,7 +527,51 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
double *tups_vacuumed,
|
||||
double *tups_recently_dead)
|
||||
{
|
||||
elog(ERROR, "cstore_relation_copy_for_cluster not implemented");
|
||||
TableWriteState *writeState = NULL;
|
||||
TableReadState *readState = NULL;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
Datum *values = NULL;
|
||||
bool *nulls = NULL;
|
||||
TupleDesc sourceDesc = RelationGetDescr(OldHeap);
|
||||
TupleDesc targetDesc = RelationGetDescr(NewHeap);
|
||||
|
||||
if (OldIndex != NULL || use_sort)
|
||||
{
|
||||
ereport(ERROR, (errmsg(CSTORE_TABLEAM_NAME " doesn't support indexes")));
|
||||
}
|
||||
|
||||
/*
|
||||
* copy_table_data in cluster.c assumes tuple descriptors are exactly
|
||||
* the same. Even dropped columns exist and are marked as attisdropped
|
||||
* in the target relation.
|
||||
*/
|
||||
Assert(sourceDesc->natts == targetDesc->natts);
|
||||
|
||||
cstoreOptions = CStoreTableAMGetOptions();
|
||||
|
||||
writeState = CStoreBeginWrite(NewHeap,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
targetDesc);
|
||||
|
||||
readState = CStoreBeginRead(OldHeap, sourceDesc, RelationColumnList(OldHeap), NULL);
|
||||
|
||||
values = palloc0(sourceDesc->natts * sizeof(Datum));
|
||||
nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
||||
|
||||
*num_tuples = 0;
|
||||
|
||||
while (CStoreReadNextRow(readState, values, nulls))
|
||||
{
|
||||
CStoreWriteRow(writeState, values, nulls);
|
||||
(*num_tuples)++;
|
||||
}
|
||||
|
||||
*tups_vacuumed = 0;
|
||||
|
||||
CStoreEndWrite(writeState);
|
||||
CStoreEndRead(readState);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
SELECT count(*) AS columnar_table_count FROM cstore.cstore_data_files \gset
|
||||
CREATE TABLE t(a int, b int) USING cstore_tableam;
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
INSERT INTO t SELECT i, i * i FROM generate_series(1, 10) i;
|
||||
INSERT INTO t SELECT i, i * i FROM generate_series(11, 20) i;
|
||||
INSERT INTO t SELECT i, i * i FROM generate_series(21, 30) i;
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
sum | sum
|
||||
-----+------
|
||||
465 | 9455
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
count
|
||||
-------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
-- vacuum full should merge stripes together
|
||||
VACUUM FULL t;
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
sum | sum
|
||||
-----+------
|
||||
465 | 9455
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- test the case when all data cannot fit into a single stripe
|
||||
SET cstore.stripe_row_count TO 1000;
|
||||
INSERT INTO t SELECT i, 2 * i FROM generate_series(1,2500) i;
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
sum | sum
|
||||
---------+---------
|
||||
3126715 | 6261955
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
VACUUM FULL t;
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
sum | sum
|
||||
---------+---------
|
||||
3126715 | 6261955
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
count
|
||||
-------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
-- VACUUM FULL doesn't reclaim dropped columns, but converts them to NULLs
|
||||
ALTER TABLE t DROP COLUMN a;
|
||||
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
|
||||
stripe | attr | block | ?column? | ?column?
|
||||
--------+------+-------+----------+----------
|
||||
0 | 1 | 0 | f | f
|
||||
0 | 2 | 0 | f | f
|
||||
1 | 1 | 0 | f | f
|
||||
1 | 2 | 0 | f | f
|
||||
2 | 1 | 0 | f | f
|
||||
2 | 2 | 0 | f | f
|
||||
(6 rows)
|
||||
|
||||
VACUUM FULL t;
|
||||
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
|
||||
stripe | attr | block | ?column? | ?column?
|
||||
--------+------+-------+----------+----------
|
||||
0 | 1 | 0 | t | t
|
||||
0 | 2 | 0 | f | f
|
||||
1 | 1 | 0 | t | t
|
||||
1 | 2 | 0 | f | f
|
||||
2 | 1 | 0 | t | t
|
||||
2 | 2 | 0 | f | f
|
||||
(6 rows)
|
||||
|
||||
-- Make sure we cleaned-up the transient table metadata after VACUUM FULL commands
|
||||
SELECT count(*) - :columnar_table_count FROM cstore.cstore_data_files;
|
||||
?column?
|
||||
----------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
DROP TABLE t;
|
||||
-- Make sure we cleaned the metadata for t too
|
||||
SELECT count(*) - :columnar_table_count FROM cstore.cstore_data_files;
|
||||
?column?
|
||||
----------
|
||||
0
|
||||
(1 row)
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
SELECT count(*) AS columnar_table_count FROM cstore.cstore_data_files \gset
|
||||
|
||||
CREATE TABLE t(a int, b int) USING cstore_tableam;
|
||||
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
|
||||
INSERT INTO t SELECT i, i * i FROM generate_series(1, 10) i;
|
||||
INSERT INTO t SELECT i, i * i FROM generate_series(11, 20) i;
|
||||
INSERT INTO t SELECT i, i * i FROM generate_series(21, 30) i;
|
||||
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
|
||||
-- vacuum full should merge stripes together
|
||||
VACUUM FULL t;
|
||||
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
|
||||
-- test the case when all data cannot fit into a single stripe
|
||||
SET cstore.stripe_row_count TO 1000;
|
||||
INSERT INTO t SELECT i, 2 * i FROM generate_series(1,2500) i;
|
||||
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
|
||||
VACUUM FULL t;
|
||||
|
||||
SELECT sum(a), sum(b) FROM t;
|
||||
SELECT count(*) FROM cstore.cstore_stripes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t';
|
||||
|
||||
-- VACUUM FULL doesn't reclaim dropped columns, but converts them to NULLs
|
||||
ALTER TABLE t DROP COLUMN a;
|
||||
|
||||
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
|
||||
|
||||
VACUUM FULL t;
|
||||
|
||||
SELECT stripe, attr, block, minimum_value IS NULL, maximum_value IS NULL FROM cstore.cstore_skipnodes a, pg_class b WHERE a.relfilenode=b.relfilenode AND b.relname='t' ORDER BY 1, 2, 3;
|
||||
|
||||
-- Make sure we cleaned-up the transient table metadata after VACUUM FULL commands
|
||||
SELECT count(*) - :columnar_table_count FROM cstore.cstore_data_files;
|
||||
|
||||
DROP TABLE t;
|
||||
|
||||
-- Make sure we cleaned the metadata for t too
|
||||
SELECT count(*) - :columnar_table_count FROM cstore.cstore_data_files;
|
Loading…
Reference in New Issue