mirror of https://github.com/citusdata/citus.git
Merge pull request #2423 from citusdata/writable_standby_coordinator
Allow simple DML commands from hot standbypull/2424/head
commit
5886e69a3a
|
@ -45,12 +45,124 @@
|
|||
* TRUNCATE and real-time SELECT queries.
|
||||
*/
|
||||
int MultiShardConnectionType = PARALLEL_CONNECTION;
|
||||
bool WritableStandbyCoordinator = false;
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static bool IsCitusPlan(Plan *plan);
|
||||
static bool IsCitusCustomScan(Plan *plan);
|
||||
static Relation StubRelation(TupleDesc tupleDescriptor);
|
||||
|
||||
|
||||
/*
|
||||
* CitusExecutorStart is the ExecutorStart_hook that gets called when
|
||||
* Postgres prepares for execution or EXPLAIN.
|
||||
*/
|
||||
void
|
||||
CitusExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||
{
|
||||
PlannedStmt *plannedStmt = queryDesc->plannedstmt;
|
||||
|
||||
/*
|
||||
* We cannot modify XactReadOnly on Windows because it is not
|
||||
* declared with PGDLLIMPORT.
|
||||
*/
|
||||
#ifndef WIN32
|
||||
if (RecoveryInProgress() && WritableStandbyCoordinator &&
|
||||
IsCitusPlan(plannedStmt->planTree))
|
||||
{
|
||||
PG_TRY();
|
||||
{
|
||||
/*
|
||||
* To enable writes from a hot standby we cheat our way through
|
||||
* the checks in standard_ExecutorStart by temporarily setting
|
||||
* XactReadOnly to false.
|
||||
*/
|
||||
XactReadOnly = false;
|
||||
standard_ExecutorStart(queryDesc, eflags);
|
||||
XactReadOnly = true;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
XactReadOnly = true;
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
else
|
||||
#endif
|
||||
{
|
||||
standard_ExecutorStart(queryDesc, eflags);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus
|
||||
* by recursively walking through the plan tree.
|
||||
*/
|
||||
static bool
|
||||
IsCitusPlan(Plan *plan)
|
||||
{
|
||||
if (plan == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsCitusCustomScan(plan))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (plan->lefttree != NULL && IsCitusPlan(plan->lefttree))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (plan->righttree != NULL && IsCitusPlan(plan->righttree))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus.
|
||||
*/
|
||||
static bool
|
||||
IsCitusCustomScan(Plan *plan)
|
||||
{
|
||||
CustomScan *customScan = NULL;
|
||||
Node *privateNode = NULL;
|
||||
|
||||
if (plan == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!IsA(plan, CustomScan))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
customScan = (CustomScan *) plan;
|
||||
if (list_length(customScan->custom_private) == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
privateNode = (Node *) linitial(customScan->custom_private);
|
||||
if (!CitusIsA(privateNode, DistributedPlan))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
|
||||
* given Citus scan node and returns it. It returns null if all tuples are read
|
||||
|
|
|
@ -162,7 +162,7 @@ _PG_init(void)
|
|||
* (thus as the innermost/last running hook) to be able to do our
|
||||
* duties. For simplicity insist that all hooks are previously unused.
|
||||
*/
|
||||
if (planner_hook != NULL || ProcessUtility_hook != NULL)
|
||||
if (planner_hook != NULL || ProcessUtility_hook != NULL || ExecutorStart_hook != NULL)
|
||||
{
|
||||
ereport(ERROR, (errmsg("Citus has to be loaded first"),
|
||||
errhint("Place citus at the beginning of "
|
||||
|
@ -208,6 +208,7 @@ _PG_init(void)
|
|||
/* register for planner hook */
|
||||
set_rel_pathlist_hook = multi_relation_restriction_hook;
|
||||
set_join_pathlist_hook = multi_join_restriction_hook;
|
||||
ExecutorStart_hook = CitusExecutorStart;
|
||||
|
||||
/* register hook for error messages */
|
||||
emit_log_hook = multi_log_hook;
|
||||
|
@ -884,6 +885,16 @@ RegisterCitusConfigVariables(void)
|
|||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.writable_standby_coordinator",
|
||||
gettext_noop("Enables simple DML via a streaming replica of the coordinator"),
|
||||
NULL,
|
||||
&WritableStandbyCoordinator,
|
||||
false,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_version_checks",
|
||||
gettext_noop("Enables version checks during CREATE/ALTER EXTENSION commands"),
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "distributed/foreign_key_relationship.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/pg_dist_local_group.h"
|
||||
#include "distributed/pg_dist_node_metadata.h"
|
||||
|
@ -233,7 +234,7 @@ PG_FUNCTION_INFO_V1(poolinfo_valid);
|
|||
void
|
||||
EnsureModificationsCanRun(void)
|
||||
{
|
||||
if (RecoveryInProgress())
|
||||
if (RecoveryInProgress() && !WritableStandbyCoordinator)
|
||||
{
|
||||
ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"),
|
||||
errdetail("the database is in recovery mode")));
|
||||
|
|
|
@ -28,6 +28,10 @@ typedef enum
|
|||
extern int MultiShardConnectionType;
|
||||
|
||||
|
||||
extern bool WritableStandbyCoordinator;
|
||||
|
||||
|
||||
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
|
||||
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
||||
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
\c - - - :master_port
|
||||
CREATE TABLE the_table (a int, b int, z bigserial);
|
||||
SELECT create_distributed_table('the_table', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE local (a int, b int);
|
||||
\c - - - :follower_master_port
|
||||
-- inserts normally do not work on a standby coordinator
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: the database is in recovery mode
|
||||
-- we can allow DML on a writable standby coordinator
|
||||
SET citus.writable_standby_coordinator TO on;
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
|
||||
SELECT * FROM the_table;
|
||||
a | b | z
|
||||
---+---+---
|
||||
1 | 2 | 2
|
||||
(1 row)
|
||||
|
||||
UPDATE the_table SET z = 3 WHERE a = 1;
|
||||
SELECT * FROM the_table;
|
||||
a | b | z
|
||||
---+---+---
|
||||
1 | 2 | 3
|
||||
(1 row)
|
||||
|
||||
DELETE FROM the_table WHERE a = 1;
|
||||
SELECT * FROM the_table;
|
||||
a | b | z
|
||||
---+---+---
|
||||
(0 rows)
|
||||
|
||||
-- drawing from a sequence is not possible
|
||||
INSERT INTO the_table (a, b) VALUES (1, 2);
|
||||
ERROR: cannot assign TransactionIds during recovery
|
||||
-- 2PC is not possible
|
||||
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
|
||||
ERROR: cannot assign TransactionIds during recovery
|
||||
-- COPY is not possible in 2PC mode
|
||||
COPY the_table (a, b, z) FROM STDIN WITH CSV;
|
||||
ERROR: cannot assign TransactionIds during recovery
|
||||
-- 1PC is possible
|
||||
SET citus.multi_shard_commit_protocol TO '1pc';
|
||||
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
a | b | z
|
||||
---+---+---
|
||||
2 | 3 | 4
|
||||
5 | 6 | 7
|
||||
(2 rows)
|
||||
|
||||
-- modifying CTEs are possible
|
||||
WITH del AS (DELETE FROM the_table RETURNING *)
|
||||
SELECT * FROM del ORDER BY a;
|
||||
a | b | z
|
||||
---+---+---
|
||||
2 | 3 | 4
|
||||
5 | 6 | 7
|
||||
(2 rows)
|
||||
|
||||
-- COPY is possible in 1PC mode
|
||||
COPY the_table (a, b, z) FROM STDIN WITH CSV;
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
a | b | z
|
||||
----+----+----
|
||||
10 | 10 | 10
|
||||
11 | 11 | 11
|
||||
(2 rows)
|
||||
|
||||
DELETE FROM the_table;
|
||||
-- DDL is not possible
|
||||
TRUNCATE the_table;
|
||||
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
|
||||
ALTER TABLE the_table ADD COLUMN c int;
|
||||
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
|
||||
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
|
||||
-- rollback is possible
|
||||
BEGIN;
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
|
||||
ROLLBACK;
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
a | b | z
|
||||
---+---+---
|
||||
(0 rows)
|
||||
|
||||
-- we should still disallow writes to local tables
|
||||
INSERT INTO local VALUES (1, 1);
|
||||
ERROR: cannot execute INSERT in a read-only transaction
|
||||
INSERT INTO local SELECT a, b FROM the_table;
|
||||
ERROR: cannot INSERT rows from a distributed query into a local table
|
||||
HINT: Consider using CREATE TEMPORARY TABLE tmp AS SELECT ... and inserting from the temporary table.
|
||||
-- we shouldn't be able to create local tables
|
||||
CREATE TEMP TABLE local_copy_of_the_table AS SELECT * FROM the_table;
|
||||
ERROR: cannot execute CREATE TABLE AS in a read-only transaction
|
||||
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
|
||||
-- separate follower formations currently cannot do writes
|
||||
SET citus.writable_standby_coordinator TO on;
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 3);
|
||||
ERROR: writing to worker nodes is not currently allowed
|
||||
DETAIL: citus.use_secondary_nodes is set to 'always'
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
a | b | z
|
||||
---+---+---
|
||||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
DROP TABLE the_table;
|
|
@ -1,4 +1,5 @@
|
|||
test: multi_follower_sanity_check
|
||||
test: multi_follower_select_statements
|
||||
test: multi_follower_dml
|
||||
test: multi_follower_configure_followers
|
||||
test: multi_follower_task_tracker
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
\c - - - :master_port
|
||||
|
||||
CREATE TABLE the_table (a int, b int, z bigserial);
|
||||
SELECT create_distributed_table('the_table', 'a');
|
||||
|
||||
CREATE TABLE local (a int, b int);
|
||||
|
||||
\c - - - :follower_master_port
|
||||
|
||||
-- inserts normally do not work on a standby coordinator
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
|
||||
|
||||
-- we can allow DML on a writable standby coordinator
|
||||
SET citus.writable_standby_coordinator TO on;
|
||||
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
|
||||
SELECT * FROM the_table;
|
||||
|
||||
UPDATE the_table SET z = 3 WHERE a = 1;
|
||||
SELECT * FROM the_table;
|
||||
|
||||
DELETE FROM the_table WHERE a = 1;
|
||||
SELECT * FROM the_table;
|
||||
|
||||
-- drawing from a sequence is not possible
|
||||
INSERT INTO the_table (a, b) VALUES (1, 2);
|
||||
|
||||
-- 2PC is not possible
|
||||
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
|
||||
|
||||
-- COPY is not possible in 2PC mode
|
||||
COPY the_table (a, b, z) FROM STDIN WITH CSV;
|
||||
10,10,10
|
||||
11,11,11
|
||||
\.
|
||||
|
||||
-- 1PC is possible
|
||||
SET citus.multi_shard_commit_protocol TO '1pc';
|
||||
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
|
||||
-- modifying CTEs are possible
|
||||
WITH del AS (DELETE FROM the_table RETURNING *)
|
||||
SELECT * FROM del ORDER BY a;
|
||||
|
||||
-- COPY is possible in 1PC mode
|
||||
COPY the_table (a, b, z) FROM STDIN WITH CSV;
|
||||
10,10,10
|
||||
11,11,11
|
||||
\.
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
DELETE FROM the_table;
|
||||
|
||||
-- DDL is not possible
|
||||
TRUNCATE the_table;
|
||||
ALTER TABLE the_table ADD COLUMN c int;
|
||||
|
||||
-- rollback is possible
|
||||
BEGIN;
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
|
||||
-- we should still disallow writes to local tables
|
||||
INSERT INTO local VALUES (1, 1);
|
||||
INSERT INTO local SELECT a, b FROM the_table;
|
||||
|
||||
-- we shouldn't be able to create local tables
|
||||
CREATE TEMP TABLE local_copy_of_the_table AS SELECT * FROM the_table;
|
||||
|
||||
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
|
||||
|
||||
-- separate follower formations currently cannot do writes
|
||||
SET citus.writable_standby_coordinator TO on;
|
||||
INSERT INTO the_table (a, b, z) VALUES (1, 2, 3);
|
||||
SELECT * FROM the_table ORDER BY a;
|
||||
|
||||
\c - - - :master_port
|
||||
DROP TABLE the_table;
|
Loading…
Reference in New Issue