diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index f088dc8a4..5735e4422 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -45,12 +45,124 @@ * TRUNCATE and real-time SELECT queries. */ int MultiShardConnectionType = PARALLEL_CONNECTION; +bool WritableStandbyCoordinator = false; /* local function forward declarations */ +static bool IsCitusPlan(Plan *plan); +static bool IsCitusCustomScan(Plan *plan); static Relation StubRelation(TupleDesc tupleDescriptor); +/* + * CitusExecutorStart is the ExecutorStart_hook that gets called when + * Postgres prepares for execution or EXPLAIN. + */ +void +CitusExecutorStart(QueryDesc *queryDesc, int eflags) +{ + PlannedStmt *plannedStmt = queryDesc->plannedstmt; + + /* + * We cannot modify XactReadOnly on Windows because it is not + * declared with PGDLLIMPORT. + */ +#ifndef WIN32 + if (RecoveryInProgress() && WritableStandbyCoordinator && + IsCitusPlan(plannedStmt->planTree)) + { + PG_TRY(); + { + /* + * To enable writes from a hot standby we cheat our way through + * the checks in standard_ExecutorStart by temporarily setting + * XactReadOnly to false. + */ + XactReadOnly = false; + standard_ExecutorStart(queryDesc, eflags); + XactReadOnly = true; + } + PG_CATCH(); + { + XactReadOnly = true; + PG_RE_THROW(); + } + PG_END_TRY(); + } + else +#endif + { + standard_ExecutorStart(queryDesc, eflags); + } +} + + +/* + * IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus + * by recursively walking through the plan tree. + */ +static bool +IsCitusPlan(Plan *plan) +{ + if (plan == NULL) + { + return false; + } + + if (IsCitusCustomScan(plan)) + { + return true; + } + + if (plan->lefttree != NULL && IsCitusPlan(plan->lefttree)) + { + return true; + } + + if (plan->righttree != NULL && IsCitusPlan(plan->righttree)) + { + return true; + } + + return false; +} + + +/* + * IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus. + */ +static bool +IsCitusCustomScan(Plan *plan) +{ + CustomScan *customScan = NULL; + Node *privateNode = NULL; + + if (plan == NULL) + { + return false; + } + + if (!IsA(plan, CustomScan)) + { + return false; + } + + customScan = (CustomScan *) plan; + if (list_length(customScan->custom_private) == 0) + { + return false; + } + + privateNode = (Node *) linitial(customScan->custom_private); + if (!CitusIsA(privateNode, DistributedPlan)) + { + return false; + } + + return true; +} + + /* * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the * given Citus scan node and returns it. It returns null if all tuples are read diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 6b30c3f86..adc328074 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -162,7 +162,7 @@ _PG_init(void) * (thus as the innermost/last running hook) to be able to do our * duties. For simplicity insist that all hooks are previously unused. */ - if (planner_hook != NULL || ProcessUtility_hook != NULL) + if (planner_hook != NULL || ProcessUtility_hook != NULL || ExecutorStart_hook != NULL) { ereport(ERROR, (errmsg("Citus has to be loaded first"), errhint("Place citus at the beginning of " @@ -208,6 +208,7 @@ _PG_init(void) /* register for planner hook */ set_rel_pathlist_hook = multi_relation_restriction_hook; set_join_pathlist_hook = multi_join_restriction_hook; + ExecutorStart_hook = CitusExecutorStart; /* register hook for error messages */ emit_log_hook = multi_log_hook; @@ -884,6 +885,16 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.writable_standby_coordinator", + gettext_noop("Enables simple DML via a streaming replica of the coordinator"), + NULL, + &WritableStandbyCoordinator, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_version_checks", gettext_noop("Enables version checks during CREATE/ALTER EXTENSION commands"), diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 1e3227570..9f6176c63 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -32,6 +32,7 @@ #include "distributed/foreign_key_relationship.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_node_metadata.h" @@ -233,7 +234,7 @@ PG_FUNCTION_INFO_V1(poolinfo_valid); void EnsureModificationsCanRun(void) { - if (RecoveryInProgress()) + if (RecoveryInProgress() && !WritableStandbyCoordinator) { ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"), errdetail("the database is in recovery mode"))); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 17a9586a7..6c798eb26 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -28,6 +28,10 @@ typedef enum extern int MultiShardConnectionType; +extern bool WritableStandbyCoordinator; + + +extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc diff --git a/src/test/regress/expected/multi_follower_dml.out b/src/test/regress/expected/multi_follower_dml.out new file mode 100644 index 000000000..bd3811864 --- /dev/null +++ b/src/test/regress/expected/multi_follower_dml.out @@ -0,0 +1,105 @@ +\c - - - :master_port +CREATE TABLE the_table (a int, b int, z bigserial); +SELECT create_distributed_table('the_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE local (a int, b int); +\c - - - :follower_master_port +-- inserts normally do not work on a standby coordinator +INSERT INTO the_table (a, b, z) VALUES (1, 2, 2); +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is in recovery mode +-- we can allow DML on a writable standby coordinator +SET citus.writable_standby_coordinator TO on; +INSERT INTO the_table (a, b, z) VALUES (1, 2, 2); +SELECT * FROM the_table; + a | b | z +---+---+--- + 1 | 2 | 2 +(1 row) + +UPDATE the_table SET z = 3 WHERE a = 1; +SELECT * FROM the_table; + a | b | z +---+---+--- + 1 | 2 | 3 +(1 row) + +DELETE FROM the_table WHERE a = 1; +SELECT * FROM the_table; + a | b | z +---+---+--- +(0 rows) + +-- drawing from a sequence is not possible +INSERT INTO the_table (a, b) VALUES (1, 2); +ERROR: cannot assign TransactionIds during recovery +-- 2PC is not possible +INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); +ERROR: cannot assign TransactionIds during recovery +-- COPY is not possible in 2PC mode +COPY the_table (a, b, z) FROM STDIN WITH CSV; +ERROR: cannot assign TransactionIds during recovery +-- 1PC is possible +SET citus.multi_shard_commit_protocol TO '1pc'; +INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); +SELECT * FROM the_table ORDER BY a; + a | b | z +---+---+--- + 2 | 3 | 4 + 5 | 6 | 7 +(2 rows) + +-- modifying CTEs are possible +WITH del AS (DELETE FROM the_table RETURNING *) +SELECT * FROM del ORDER BY a; + a | b | z +---+---+--- + 2 | 3 | 4 + 5 | 6 | 7 +(2 rows) + +-- COPY is possible in 1PC mode +COPY the_table (a, b, z) FROM STDIN WITH CSV; +SELECT * FROM the_table ORDER BY a; + a | b | z +----+----+---- + 10 | 10 | 10 + 11 | 11 | 11 +(2 rows) + +DELETE FROM the_table; +-- DDL is not possible +TRUNCATE the_table; +ERROR: cannot execute TRUNCATE TABLE in a read-only transaction +ALTER TABLE the_table ADD COLUMN c int; +ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress +HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery. +-- rollback is possible +BEGIN; +INSERT INTO the_table (a, b, z) VALUES (1, 2, 2); +ROLLBACK; +SELECT * FROM the_table ORDER BY a; + a | b | z +---+---+--- +(0 rows) + +-- we should still disallow writes to local tables +INSERT INTO local VALUES (1, 1); +ERROR: cannot execute INSERT in a read-only transaction +\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" +-- separate follower formations currently cannot do writes +SET citus.writable_standby_coordinator TO on; +INSERT INTO the_table (a, b, z) VALUES (1, 2, 3); +ERROR: writing to worker nodes is not currently allowed +DETAIL: citus.use_secondary_nodes is set to 'always' +SELECT * FROM the_table ORDER BY a; + a | b | z +---+---+--- +(0 rows) + +\c - - - :master_port +DROP TABLE the_table; diff --git a/src/test/regress/multi_follower_schedule b/src/test/regress/multi_follower_schedule index 1d55732f6..9c0b63099 100644 --- a/src/test/regress/multi_follower_schedule +++ b/src/test/regress/multi_follower_schedule @@ -1,4 +1,5 @@ test: multi_follower_sanity_check test: multi_follower_select_statements +test: multi_follower_dml test: multi_follower_configure_followers test: multi_follower_task_tracker diff --git a/src/test/regress/sql/multi_follower_dml.sql b/src/test/regress/sql/multi_follower_dml.sql new file mode 100644 index 000000000..f002f10ab --- /dev/null +++ b/src/test/regress/sql/multi_follower_dml.sql @@ -0,0 +1,76 @@ +\c - - - :master_port + +CREATE TABLE the_table (a int, b int, z bigserial); +SELECT create_distributed_table('the_table', 'a'); + +CREATE TABLE local (a int, b int); + +\c - - - :follower_master_port + +-- inserts normally do not work on a standby coordinator +INSERT INTO the_table (a, b, z) VALUES (1, 2, 2); + +-- we can allow DML on a writable standby coordinator +SET citus.writable_standby_coordinator TO on; + +INSERT INTO the_table (a, b, z) VALUES (1, 2, 2); +SELECT * FROM the_table; + +UPDATE the_table SET z = 3 WHERE a = 1; +SELECT * FROM the_table; + +DELETE FROM the_table WHERE a = 1; +SELECT * FROM the_table; + +-- drawing from a sequence is not possible +INSERT INTO the_table (a, b) VALUES (1, 2); + +-- 2PC is not possible +INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); + +-- COPY is not possible in 2PC mode +COPY the_table (a, b, z) FROM STDIN WITH CSV; +10,10,10 +11,11,11 +\. + +-- 1PC is possible +SET citus.multi_shard_commit_protocol TO '1pc'; +INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); +SELECT * FROM the_table ORDER BY a; + +-- modifying CTEs are possible +WITH del AS (DELETE FROM the_table RETURNING *) +SELECT * FROM del ORDER BY a; + +-- COPY is possible in 1PC mode +COPY the_table (a, b, z) FROM STDIN WITH CSV; +10,10,10 +11,11,11 +\. +SELECT * FROM the_table ORDER BY a; +DELETE FROM the_table; + +-- DDL is not possible +TRUNCATE the_table; +ALTER TABLE the_table ADD COLUMN c int; + +-- rollback is possible +BEGIN; +INSERT INTO the_table (a, b, z) VALUES (1, 2, 2); +ROLLBACK; + +SELECT * FROM the_table ORDER BY a; + +-- we should still disallow writes to local tables +INSERT INTO local VALUES (1, 1); + +\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" + +-- separate follower formations currently cannot do writes +SET citus.writable_standby_coordinator TO on; +INSERT INTO the_table (a, b, z) VALUES (1, 2, 3); +SELECT * FROM the_table ORDER BY a; + +\c - - - :master_port +DROP TABLE the_table;