mirror of https://github.com/citusdata/citus.git
Merge pull request #4320 from citusdata/cstore_alter_table
Fix ALTER COLUMN ... TYPE for columnarpull/4325/head^2
commit
c35f38459b
|
@ -60,10 +60,22 @@
|
||||||
#define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL 50 /* ms */
|
#define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL 50 /* ms */
|
||||||
#define VACUUM_TRUNCATE_LOCK_TIMEOUT 4500 /* ms */
|
#define VACUUM_TRUNCATE_LOCK_TIMEOUT 4500 /* ms */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CStoreScanDescData is the scan state passed between beginscan(),
|
||||||
|
* getnextslot(), rescan(), and endscan() calls.
|
||||||
|
*/
|
||||||
typedef struct CStoreScanDescData
|
typedef struct CStoreScanDescData
|
||||||
{
|
{
|
||||||
TableScanDescData cs_base;
|
TableScanDescData cs_base;
|
||||||
TableReadState *cs_readState;
|
TableReadState *cs_readState;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We initialize cs_readState lazily in the first getnextslot() call. We
|
||||||
|
* need the following for initialization. We save them in beginscan().
|
||||||
|
*/
|
||||||
|
MemoryContext scanContext;
|
||||||
|
Bitmapset *attr_needed;
|
||||||
|
List *scanQual;
|
||||||
} CStoreScanDescData;
|
} CStoreScanDescData;
|
||||||
|
|
||||||
typedef struct CStoreScanDescData *CStoreScanDesc;
|
typedef struct CStoreScanDescData *CStoreScanDesc;
|
||||||
|
@ -199,13 +211,22 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
||||||
ParallelTableScanDesc parallel_scan,
|
ParallelTableScanDesc parallel_scan,
|
||||||
uint32 flags, Bitmapset *attr_needed, List *scanQual)
|
uint32 flags, Bitmapset *attr_needed, List *scanQual)
|
||||||
{
|
{
|
||||||
TupleDesc tupdesc = relation->rd_att;
|
|
||||||
Oid relfilenode = relation->rd_node.relNode;
|
Oid relfilenode = relation->rd_node.relNode;
|
||||||
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
|
||||||
List *neededColumnList = NIL;
|
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
|
||||||
ListCell *columnCell = NULL;
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A memory context to use for scan-wide data, including the lazily
|
||||||
|
* initialized read state. We assume that beginscan is called in a
|
||||||
|
* context that will last until end of scan.
|
||||||
|
*/
|
||||||
|
MemoryContext scanContext =
|
||||||
|
AllocSetContextCreate(
|
||||||
|
CurrentMemoryContext,
|
||||||
|
"Column Store Scan Context",
|
||||||
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
|
||||||
|
|
||||||
|
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
||||||
scan->cs_base.rs_rd = relation;
|
scan->cs_base.rs_rd = relation;
|
||||||
scan->cs_base.rs_snapshot = snapshot;
|
scan->cs_base.rs_snapshot = snapshot;
|
||||||
scan->cs_base.rs_nkeys = nkeys;
|
scan->cs_base.rs_nkeys = nkeys;
|
||||||
|
@ -213,6 +234,18 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
||||||
scan->cs_base.rs_flags = flags;
|
scan->cs_base.rs_flags = flags;
|
||||||
scan->cs_base.rs_parallel = parallel_scan;
|
scan->cs_base.rs_parallel = parallel_scan;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We will initialize this lazily in first tuple, where we have the actual
|
||||||
|
* tuple descriptor to use for reading. In some cases like ALTER TABLE ...
|
||||||
|
* ALTER COLUMN ... TYPE, the tuple descriptor of relation doesn't match
|
||||||
|
* the storage which we are reading, so we need to use the tuple descriptor
|
||||||
|
* of "slot" in first read.
|
||||||
|
*/
|
||||||
|
scan->cs_readState = NULL;
|
||||||
|
scan->attr_needed = bms_copy(attr_needed);
|
||||||
|
scan->scanQual = copyObject(scanQual);
|
||||||
|
scan->scanContext = scanContext;
|
||||||
|
|
||||||
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
|
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
|
||||||
{
|
{
|
||||||
elog(ERROR,
|
elog(ERROR,
|
||||||
|
@ -221,8 +254,24 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
||||||
|
|
||||||
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
|
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
return ((TableScanDesc) scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* init_cstore_read_state initializes a column store table read and returns the
|
||||||
|
* state.
|
||||||
|
*/
|
||||||
|
static TableReadState *
|
||||||
|
init_cstore_read_state(Relation relation, TupleDesc tupdesc, Bitmapset *attr_needed,
|
||||||
|
List *scanQual)
|
||||||
|
{
|
||||||
List *columnList = RelationColumnList(relation);
|
List *columnList = RelationColumnList(relation);
|
||||||
|
ListCell *columnCell = NULL;
|
||||||
|
|
||||||
|
List *neededColumnList = NIL;
|
||||||
|
|
||||||
/* only collect columns that we need for the scan */
|
/* only collect columns that we need for the scan */
|
||||||
foreach(columnCell, columnList)
|
foreach(columnCell, columnList)
|
||||||
|
@ -237,10 +286,7 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
||||||
TableReadState *readState = CStoreBeginRead(relation, tupdesc, neededColumnList,
|
TableReadState *readState = CStoreBeginRead(relation, tupdesc, neededColumnList,
|
||||||
scanQual);
|
scanQual);
|
||||||
|
|
||||||
scan->cs_readState = readState;
|
return readState;
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
return ((TableScanDesc) scan);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -248,8 +294,11 @@ static void
|
||||||
cstore_endscan(TableScanDesc sscan)
|
cstore_endscan(TableScanDesc sscan)
|
||||||
{
|
{
|
||||||
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
||||||
CStoreEndRead(scan->cs_readState);
|
if (scan->cs_readState != NULL)
|
||||||
scan->cs_readState = NULL;
|
{
|
||||||
|
CStoreEndRead(scan->cs_readState);
|
||||||
|
scan->cs_readState = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -258,7 +307,10 @@ cstore_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
||||||
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
||||||
{
|
{
|
||||||
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
||||||
CStoreRescan(scan->cs_readState);
|
if (scan->cs_readState != NULL)
|
||||||
|
{
|
||||||
|
CStoreRescan(scan->cs_readState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -266,15 +318,24 @@ static bool
|
||||||
cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
|
cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
|
||||||
{
|
{
|
||||||
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
|
||||||
|
/*
|
||||||
|
* if this is the first row, initialize read state.
|
||||||
|
*/
|
||||||
|
if (scan->cs_readState == NULL)
|
||||||
|
{
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(scan->scanContext);
|
||||||
|
scan->cs_readState =
|
||||||
|
init_cstore_read_state(scan->cs_base.rs_rd, slot->tts_tupleDescriptor,
|
||||||
|
scan->attr_needed, scan->scanQual);
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
}
|
||||||
|
|
||||||
ExecClearTuple(slot);
|
ExecClearTuple(slot);
|
||||||
|
|
||||||
bool nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values,
|
bool nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values,
|
||||||
slot->tts_isnull);
|
slot->tts_isnull);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
if (!nextRowFound)
|
if (!nextRowFound)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -8,6 +8,7 @@ test: am_drop
|
||||||
test: am_insert
|
test: am_insert
|
||||||
test: am_copyto
|
test: am_copyto
|
||||||
test: am_alter
|
test: am_alter
|
||||||
|
test: am_alter_set_type
|
||||||
test: am_rollback
|
test: am_rollback
|
||||||
test: am_truncate
|
test: am_truncate
|
||||||
test: am_vacuum
|
test: am_vacuum
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
--
|
||||||
|
-- Testing ALTER TABLE on columnar tables.
|
||||||
|
--
|
||||||
|
CREATE TABLE test_alter_table (a int, b int, c text) USING columnar;
|
||||||
|
WITH sample_data AS (VALUES
|
||||||
|
(1, 2, '3'),
|
||||||
|
(4, 5, '6')
|
||||||
|
)
|
||||||
|
INSERT INTO test_alter_table SELECT * FROM sample_data;
|
||||||
|
WITH sample_data AS (VALUES
|
||||||
|
(5, 9, '11'),
|
||||||
|
(12, 83, '93')
|
||||||
|
)
|
||||||
|
INSERT INTO test_alter_table SELECT * FROM sample_data;
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN a TYPE jsonb USING row_to_json(row(a));
|
||||||
|
SELECT * FROM test_alter_table ORDER BY a;
|
||||||
|
a | b | c
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{"f1": 1} | 2 | 3
|
||||||
|
{"f1": 4} | 5 | 6
|
||||||
|
{"f1": 5} | 9 | 11
|
||||||
|
{"f1": 12} | 83 | 93
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN c TYPE int USING c::integer;
|
||||||
|
SELECT sum(c) FROM test_alter_table;
|
||||||
|
sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
113
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN b TYPE bigint;
|
||||||
|
SELECT * FROM test_alter_table ORDER BY a;
|
||||||
|
a | b | c
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{"f1": 1} | 2 | 3
|
||||||
|
{"f1": 4} | 5 | 6
|
||||||
|
{"f1": 5} | 9 | 11
|
||||||
|
{"f1": 12} | 83 | 93
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN b TYPE float USING (b::float + 0.5);
|
||||||
|
SELECT * FROM test_alter_table ORDER BY a;
|
||||||
|
a | b | c
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{"f1": 1} | 2.5 | 3
|
||||||
|
{"f1": 4} | 5.5 | 6
|
||||||
|
{"f1": 5} | 9.5 | 11
|
||||||
|
{"f1": 12} | 83.5 | 93
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
DROP TABLE test_alter_table;
|
|
@ -0,0 +1,31 @@
|
||||||
|
--
|
||||||
|
-- Testing ALTER TABLE on columnar tables.
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE test_alter_table (a int, b int, c text) USING columnar;
|
||||||
|
|
||||||
|
WITH sample_data AS (VALUES
|
||||||
|
(1, 2, '3'),
|
||||||
|
(4, 5, '6')
|
||||||
|
)
|
||||||
|
INSERT INTO test_alter_table SELECT * FROM sample_data;
|
||||||
|
|
||||||
|
WITH sample_data AS (VALUES
|
||||||
|
(5, 9, '11'),
|
||||||
|
(12, 83, '93')
|
||||||
|
)
|
||||||
|
INSERT INTO test_alter_table SELECT * FROM sample_data;
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN a TYPE jsonb USING row_to_json(row(a));
|
||||||
|
SELECT * FROM test_alter_table ORDER BY a;
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN c TYPE int USING c::integer;
|
||||||
|
SELECT sum(c) FROM test_alter_table;
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN b TYPE bigint;
|
||||||
|
SELECT * FROM test_alter_table ORDER BY a;
|
||||||
|
|
||||||
|
ALTER TABLE test_alter_table ALTER COLUMN b TYPE float USING (b::float + 0.5);
|
||||||
|
SELECT * FROM test_alter_table ORDER BY a;
|
||||||
|
|
||||||
|
DROP TABLE test_alter_table;
|
Loading…
Reference in New Issue