mirror of https://github.com/citusdata/citus.git
Prevent inserting into logically-replicated columnar table. (#4429)
parent
f2056e553f
commit
3e0f1aaaab
|
@ -23,6 +23,7 @@
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
#include "catalog/objectaccess.h"
|
#include "catalog/objectaccess.h"
|
||||||
#include "catalog/pg_am.h"
|
#include "catalog/pg_am.h"
|
||||||
|
#include "catalog/pg_publication.h"
|
||||||
#include "catalog/pg_trigger.h"
|
#include "catalog/pg_trigger.h"
|
||||||
#include "catalog/storage.h"
|
#include "catalog/storage.h"
|
||||||
#include "catalog/storage_xlog.h"
|
#include "catalog/storage_xlog.h"
|
||||||
|
@ -44,6 +45,7 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/pg_rusage.h"
|
#include "utils/pg_rusage.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
#include "utils/relcache.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
@ -121,6 +123,7 @@ static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode,
|
||||||
static void LogRelationStats(Relation rel, int elevel);
|
static void LogRelationStats(Relation rel, int elevel);
|
||||||
static void TruncateCStore(Relation rel, int elevel);
|
static void TruncateCStore(Relation rel, int elevel);
|
||||||
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
||||||
|
static void ColumnarCheckLogicalReplication(Relation rel);
|
||||||
|
|
||||||
/* Custom tuple slot ops used for columnar. Initialized in cstore_tableam_init(). */
|
/* Custom tuple slot ops used for columnar. Initialized in cstore_tableam_init(). */
|
||||||
TupleTableSlotOps TTSOpsColumnar;
|
TupleTableSlotOps TTSOpsColumnar;
|
||||||
|
@ -454,6 +457,8 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext);
|
||||||
|
|
||||||
HeapTuple heapTuple = ExecCopySlotHeapTuple(slot);
|
HeapTuple heapTuple = ExecCopySlotHeapTuple(slot);
|
||||||
|
|
||||||
|
ColumnarCheckLogicalReplication(relation);
|
||||||
if (HeapTupleHasExternal(heapTuple))
|
if (HeapTupleHasExternal(heapTuple))
|
||||||
{
|
{
|
||||||
/* detoast any toasted attributes */
|
/* detoast any toasted attributes */
|
||||||
|
@ -497,6 +502,7 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
||||||
RelationGetDescr(relation),
|
RelationGetDescr(relation),
|
||||||
GetCurrentSubTransactionId());
|
GetCurrentSubTransactionId());
|
||||||
|
|
||||||
|
ColumnarCheckLogicalReplication(relation);
|
||||||
for (int i = 0; i < ntuples; i++)
|
for (int i = 0; i < ntuples; i++)
|
||||||
{
|
{
|
||||||
TupleTableSlot *tupleSlot = slots[i];
|
TupleTableSlot *tupleSlot = slots[i];
|
||||||
|
@ -1378,6 +1384,36 @@ columnar_handler(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarCheckLogicalReplication throws an error if the relation is
|
||||||
|
* part of any publication. This should be called before any write to
|
||||||
|
* a columnar table, because columnar changes are not replicated with
|
||||||
|
* logical replication (similar to a row table without a replica
|
||||||
|
* identity).
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ColumnarCheckLogicalReplication(Relation rel)
|
||||||
|
{
|
||||||
|
if (!is_publishable_relation(rel))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rel->rd_pubactions == NULL)
|
||||||
|
{
|
||||||
|
GetRelationPublicationActions(rel);
|
||||||
|
Assert(rel->rd_pubactions != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rel->rd_pubactions->pubinsert)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg(
|
||||||
|
"cannot insert into columnar table that is a part of a publication")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusCreateAlterColumnarTableSet generates a portable
|
* CitusCreateAlterColumnarTableSet generates a portable
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -84,3 +84,15 @@ WHERE a.int_val = c.int_val AND a.hash = md5(c.text_val);
|
||||||
|
|
||||||
DROP TABLE test_long_text_hash;
|
DROP TABLE test_long_text_hash;
|
||||||
DROP TABLE test_cstore_long_text;
|
DROP TABLE test_cstore_long_text;
|
||||||
|
CREATE TABLE test_logical_replication(i int) USING columnar;
|
||||||
|
-- should succeed
|
||||||
|
INSERT INTO test_logical_replication VALUES (1);
|
||||||
|
CREATE PUBLICATION test_columnar_publication
|
||||||
|
FOR TABLE test_logical_replication;
|
||||||
|
-- should fail; columnar does not support logical replication
|
||||||
|
INSERT INTO test_logical_replication VALUES (2);
|
||||||
|
ERROR: cannot insert into columnar table that is a part of a publication
|
||||||
|
DROP PUBLICATION test_columnar_publication;
|
||||||
|
-- should succeed
|
||||||
|
INSERT INTO test_logical_replication VALUES (3);
|
||||||
|
DROP TABLE test_logical_replication;
|
||||||
|
|
|
@ -54,3 +54,15 @@ WHERE a.int_val = c.int_val AND a.hash = md5(c.text_val);
|
||||||
|
|
||||||
DROP TABLE test_long_text_hash;
|
DROP TABLE test_long_text_hash;
|
||||||
DROP TABLE test_cstore_long_text;
|
DROP TABLE test_cstore_long_text;
|
||||||
|
|
||||||
|
CREATE TABLE test_logical_replication(i int) USING columnar;
|
||||||
|
-- should succeed
|
||||||
|
INSERT INTO test_logical_replication VALUES (1);
|
||||||
|
CREATE PUBLICATION test_columnar_publication
|
||||||
|
FOR TABLE test_logical_replication;
|
||||||
|
-- should fail; columnar does not support logical replication
|
||||||
|
INSERT INTO test_logical_replication VALUES (2);
|
||||||
|
DROP PUBLICATION test_columnar_publication;
|
||||||
|
-- should succeed
|
||||||
|
INSERT INTO test_logical_replication VALUES (3);
|
||||||
|
DROP TABLE test_logical_replication;
|
||||||
|
|
Loading…
Reference in New Issue