Allow simple DML commands from hot standby

pull/2423/head
Marco Slot 2018-09-27 06:04:54 +02:00
parent 1cb48416eb
commit d56baefe3d
7 changed files with 312 additions and 2 deletions

View File

@ -45,12 +45,124 @@
* TRUNCATE and real-time SELECT queries. * TRUNCATE and real-time SELECT queries.
*/ */
int MultiShardConnectionType = PARALLEL_CONNECTION; int MultiShardConnectionType = PARALLEL_CONNECTION;
bool WritableStandbyCoordinator = false;
/* local function forward declarations */ /* local function forward declarations */
static bool IsCitusPlan(Plan *plan);
static bool IsCitusCustomScan(Plan *plan);
static Relation StubRelation(TupleDesc tupleDescriptor); static Relation StubRelation(TupleDesc tupleDescriptor);
/*
* CitusExecutorStart is the ExecutorStart_hook that gets called when
* Postgres prepares for execution or EXPLAIN.
*/
void
CitusExecutorStart(QueryDesc *queryDesc, int eflags)
{
PlannedStmt *plannedStmt = queryDesc->plannedstmt;
/*
* We cannot modify XactReadOnly on Windows because it is not
* declared with PGDLLIMPORT.
*/
#ifndef WIN32
if (RecoveryInProgress() && WritableStandbyCoordinator &&
IsCitusPlan(plannedStmt->planTree))
{
PG_TRY();
{
/*
* To enable writes from a hot standby we cheat our way through
* the checks in standard_ExecutorStart by temporarily setting
* XactReadOnly to false.
*/
XactReadOnly = false;
standard_ExecutorStart(queryDesc, eflags);
XactReadOnly = true;
}
PG_CATCH();
{
XactReadOnly = true;
PG_RE_THROW();
}
PG_END_TRY();
}
else
#endif
{
standard_ExecutorStart(queryDesc, eflags);
}
}
/*
* IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus
* by recursively walking through the plan tree.
*/
static bool
IsCitusPlan(Plan *plan)
{
if (plan == NULL)
{
return false;
}
if (IsCitusCustomScan(plan))
{
return true;
}
if (plan->lefttree != NULL && IsCitusPlan(plan->lefttree))
{
return true;
}
if (plan->righttree != NULL && IsCitusPlan(plan->righttree))
{
return true;
}
return false;
}
/*
* IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus.
*/
static bool
IsCitusCustomScan(Plan *plan)
{
CustomScan *customScan = NULL;
Node *privateNode = NULL;
if (plan == NULL)
{
return false;
}
if (!IsA(plan, CustomScan))
{
return false;
}
customScan = (CustomScan *) plan;
if (list_length(customScan->custom_private) == 0)
{
return false;
}
privateNode = (Node *) linitial(customScan->custom_private);
if (!CitusIsA(privateNode, DistributedPlan))
{
return false;
}
return true;
}
/* /*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read * given Citus scan node and returns it. It returns null if all tuples are read

View File

@ -162,7 +162,7 @@ _PG_init(void)
* (thus as the innermost/last running hook) to be able to do our * (thus as the innermost/last running hook) to be able to do our
* duties. For simplicity insist that all hooks are previously unused. * duties. For simplicity insist that all hooks are previously unused.
*/ */
if (planner_hook != NULL || ProcessUtility_hook != NULL) if (planner_hook != NULL || ProcessUtility_hook != NULL || ExecutorStart_hook != NULL)
{ {
ereport(ERROR, (errmsg("Citus has to be loaded first"), ereport(ERROR, (errmsg("Citus has to be loaded first"),
errhint("Place citus at the beginning of " errhint("Place citus at the beginning of "
@ -208,6 +208,7 @@ _PG_init(void)
/* register for planner hook */ /* register for planner hook */
set_rel_pathlist_hook = multi_relation_restriction_hook; set_rel_pathlist_hook = multi_relation_restriction_hook;
set_join_pathlist_hook = multi_join_restriction_hook; set_join_pathlist_hook = multi_join_restriction_hook;
ExecutorStart_hook = CitusExecutorStart;
/* register hook for error messages */ /* register hook for error messages */
emit_log_hook = multi_log_hook; emit_log_hook = multi_log_hook;
@ -884,6 +885,16 @@ RegisterCitusConfigVariables(void)
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.writable_standby_coordinator",
gettext_noop("Enables simple DML via a streaming replica of the coordinator"),
NULL,
&WritableStandbyCoordinator,
false,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_version_checks", "citus.enable_version_checks",
gettext_noop("Enables version checks during CREATE/ALTER EXTENSION commands"), gettext_noop("Enables version checks during CREATE/ALTER EXTENSION commands"),

View File

@ -32,6 +32,7 @@
#include "distributed/foreign_key_relationship.h" #include "distributed/foreign_key_relationship.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_local_group.h"
#include "distributed/pg_dist_node_metadata.h" #include "distributed/pg_dist_node_metadata.h"
@ -233,7 +234,7 @@ PG_FUNCTION_INFO_V1(poolinfo_valid);
void void
EnsureModificationsCanRun(void) EnsureModificationsCanRun(void)
{ {
if (RecoveryInProgress()) if (RecoveryInProgress() && !WritableStandbyCoordinator)
{ {
ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"), ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"),
errdetail("the database is in recovery mode"))); errdetail("the database is in recovery mode")));

View File

@ -28,6 +28,10 @@ typedef enum
extern int MultiShardConnectionType; extern int MultiShardConnectionType;
extern bool WritableStandbyCoordinator;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc

View File

@ -0,0 +1,105 @@
\c - - - :master_port
CREATE TABLE the_table (a int, b int, z bigserial);
SELECT create_distributed_table('the_table', 'a');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE local (a int, b int);
\c - - - :follower_master_port
-- inserts normally do not work on a standby coordinator
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is in recovery mode
-- we can allow DML on a writable standby coordinator
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
SELECT * FROM the_table;
a | b | z
---+---+---
1 | 2 | 2
(1 row)
UPDATE the_table SET z = 3 WHERE a = 1;
SELECT * FROM the_table;
a | b | z
---+---+---
1 | 2 | 3
(1 row)
DELETE FROM the_table WHERE a = 1;
SELECT * FROM the_table;
a | b | z
---+---+---
(0 rows)
-- drawing from a sequence is not possible
INSERT INTO the_table (a, b) VALUES (1, 2);
ERROR: cannot assign TransactionIds during recovery
-- 2PC is not possible
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
ERROR: cannot assign TransactionIds during recovery
-- COPY is not possible in 2PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
ERROR: cannot assign TransactionIds during recovery
-- 1PC is possible
SET citus.multi_shard_commit_protocol TO '1pc';
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
SELECT * FROM the_table ORDER BY a;
a | b | z
---+---+---
2 | 3 | 4
5 | 6 | 7
(2 rows)
-- modifying CTEs are possible
WITH del AS (DELETE FROM the_table RETURNING *)
SELECT * FROM del ORDER BY a;
a | b | z
---+---+---
2 | 3 | 4
5 | 6 | 7
(2 rows)
-- COPY is possible in 1PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
SELECT * FROM the_table ORDER BY a;
a | b | z
----+----+----
10 | 10 | 10
11 | 11 | 11
(2 rows)
DELETE FROM the_table;
-- DDL is not possible
TRUNCATE the_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
ALTER TABLE the_table ADD COLUMN c int;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
-- rollback is possible
BEGIN;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
ROLLBACK;
SELECT * FROM the_table ORDER BY a;
a | b | z
---+---+---
(0 rows)
-- we should still disallow writes to local tables
INSERT INTO local VALUES (1, 1);
ERROR: cannot execute INSERT in a read-only transaction
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
-- separate follower formations currently cannot do writes
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 3);
ERROR: writing to worker nodes is not currently allowed
DETAIL: citus.use_secondary_nodes is set to 'always'
SELECT * FROM the_table ORDER BY a;
a | b | z
---+---+---
(0 rows)
\c - - - :master_port
DROP TABLE the_table;

View File

@ -1,4 +1,5 @@
test: multi_follower_sanity_check test: multi_follower_sanity_check
test: multi_follower_select_statements test: multi_follower_select_statements
test: multi_follower_dml
test: multi_follower_configure_followers test: multi_follower_configure_followers
test: multi_follower_task_tracker test: multi_follower_task_tracker

View File

@ -0,0 +1,76 @@
\c - - - :master_port
CREATE TABLE the_table (a int, b int, z bigserial);
SELECT create_distributed_table('the_table', 'a');
CREATE TABLE local (a int, b int);
\c - - - :follower_master_port
-- inserts normally do not work on a standby coordinator
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
-- we can allow DML on a writable standby coordinator
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
SELECT * FROM the_table;
UPDATE the_table SET z = 3 WHERE a = 1;
SELECT * FROM the_table;
DELETE FROM the_table WHERE a = 1;
SELECT * FROM the_table;
-- drawing from a sequence is not possible
INSERT INTO the_table (a, b) VALUES (1, 2);
-- 2PC is not possible
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
-- COPY is not possible in 2PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
-- 1PC is possible
SET citus.multi_shard_commit_protocol TO '1pc';
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
SELECT * FROM the_table ORDER BY a;
-- modifying CTEs are possible
WITH del AS (DELETE FROM the_table RETURNING *)
SELECT * FROM del ORDER BY a;
-- COPY is possible in 1PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
SELECT * FROM the_table ORDER BY a;
DELETE FROM the_table;
-- DDL is not possible
TRUNCATE the_table;
ALTER TABLE the_table ADD COLUMN c int;
-- rollback is possible
BEGIN;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
ROLLBACK;
SELECT * FROM the_table ORDER BY a;
-- we should still disallow writes to local tables
INSERT INTO local VALUES (1, 1);
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
-- separate follower formations currently cannot do writes
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 3);
SELECT * FROM the_table ORDER BY a;
\c - - - :master_port
DROP TABLE the_table;