diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 14e270aa6..fc1b2c9c5 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -408,6 +408,27 @@ CloseConnectionByPGconn(PGconn *pqConn) } +/* + * FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment. + * The function iterates over the multiConnectionList and finishes the connection + * establishment for each multi connection. + */ +void +FinishConnectionListEstablishment(List *multiConnectionList) +{ + ListCell *multiConnectionCell = NULL; + + foreach(multiConnectionCell, multiConnectionList) + { + MultiConnection *multiConnection = (MultiConnection *) lfirst( + multiConnectionCell); + + /* TODO: consider making connection establishment fully in parallel */ + FinishConnectionEstablishment(multiConnection); + } +} + + /* * Synchronously finish connection establishment of an individual connection. * diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d993b660e..2af22d251 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -95,6 +95,7 @@ static HTAB * CreateXactParticipantHash(void); static void ReacquireMetadataLocks(List *taskList); static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, bool isModificationQuery, bool expectResults); +static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList); static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -717,6 +718,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, int64 affectedTupleCount = -1; bool gotResults = false; char *queryString = task->queryString; + bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -726,6 +728,22 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, "modifications"))); } + /* + * Firstly ensure that distributed transaction is started. Then, force + * the transaction manager to use 2PC while running the task on the placements. + */ + if (taskRequiresTwoPhaseCommit) + { + BeginOrContinueCoordinatedTransaction(); + CoordinatedTransactionUse2PC(); + + /* + * Mark connections for all placements as critical and establish connections + * to all placements at once. + */ + GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList); + } + /* * We could naturally handle function-based transactions (i.e. those * using PL/pgSQL or similar) by checking the type of queryDesc->dest, @@ -767,6 +785,9 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, continue; } + /* if we're running a 2PC, the query should fail on error */ + failOnError = taskRequiresTwoPhaseCommit; + /* * If caller is interested, store query results the first time * through. The output of the query's execution on other shards is @@ -838,12 +859,18 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, ereport(ERROR, (errmsg("could not modify any active placements"))); } - /* otherwise, mark failed placements as inactive: they're stale */ + /* + * Otherwise, mark failed placements as inactive: they're stale. Note that + * connections for tasks that require 2PC has already failed the whole transaction + * and there is no way that they're marked stale here. + */ foreach(failedPlacementCell, failedPlacementList) { ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell); + Assert(!taskRequiresTwoPhaseCommit); + UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE); } @@ -854,6 +881,38 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } +/* + * GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list, + * starts the connections to the nodes and marks them critical. In the second iteration, + * the connection establishments are finished. Finally, BEGIN commands are sent, + * if necessary. + */ +static void +GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList) +{ + ListCell *taskPlacementCell = NULL; + List *multiConnectionList = NIL; + + /* in the first iteration start the connections */ + foreach(taskPlacementCell, taskPlacementList) + { + ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *multiConnection = StartNodeConnection(connectionFlags, + taskPlacement->nodeName, + taskPlacement->nodePort); + + MarkRemoteTransactionCritical(multiConnection); + + multiConnectionList = lappend(multiConnectionList, multiConnection); + } + + FinishConnectionListEstablishment(multiConnectionList); + + RemoteTransactionsBeginIfNecessary(multiConnectionList); +} + + /* * ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves * the results and, if RETURNING is used, stores them in a tuple store. diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 9e32a9446..994bc9a5a 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -30,6 +30,7 @@ #include "distributed/multi_client_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/pg_dist_partition.h" #include "distributed/worker_protocol.h" #include "storage/fd.h" #include "utils/builtins.h" @@ -2764,6 +2765,7 @@ JobCleanupTask(uint64 jobId) jobCleanupTask = CitusMakeNode(Task); jobCleanupTask->jobId = jobId; jobCleanupTask->taskId = JOB_CLEANUP_TASK_ID; + jobCleanupTask->replicationModel = REPLICATION_MODEL_INVALID; jobCleanupTask->queryString = jobCleanupQuery->data; return jobCleanupTask; diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 68e2c64ef..613df3adc 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -1129,6 +1129,7 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt) task->taskType = SQL_TASK; task->queryString = pstrdup(vacuumString->data); task->dependedTaskList = NULL; + task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); @@ -2097,6 +2098,7 @@ DDLTaskList(Oid relationId, const char *commandString) task->taskId = taskId++; task->taskType = SQL_TASK; task->queryString = applyCommand->data; + task->replicationModel = REPLICATION_MODEL_INVALID; task->dependedTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); @@ -2161,6 +2163,7 @@ ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, task->taskType = SQL_TASK; task->queryString = applyCommand->data; task->dependedTaskList = NULL; + task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = leftShardId; task->taskPlacementList = FinalizedShardPlacementList(leftShardId); diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 60f84c738..2c7378466 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -201,6 +201,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation task->taskType = SQL_TASK; task->queryString = shardQueryString->data; task->dependedTaskList = NULL; + task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 1ec62653a..16f888c82 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4024,6 +4024,7 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin task->jobId = jobId; task->taskId = taskId; task->taskType = taskType; + task->replicationModel = REPLICATION_MODEL_INVALID; task->queryString = queryString; return task; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index f478484d4..e3956fff8 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -326,6 +326,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter uint64 shardId = shardInterval->shardId; Oid distributedTableId = shardInterval->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); RelationRestrictionContext *copiedRestrictionContext = CopyRelationRestrictionContext(restrictionContext); @@ -460,6 +461,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->upsertQuery = upsertQuery; modifyTask->relationShardList = relationShardList; + modifyTask->replicationModel = cacheEntry->replicationModel; return modifyTask; } @@ -1640,9 +1642,11 @@ RouterModifyTask(Query *originalQuery, Query *query) { ShardInterval *shardInterval = TargetShardIntervalForModify(query); uint64 shardId = shardInterval->shardId; + Oid distributedTableId = shardInterval->relationId; StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; bool upsertQuery = false; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -1674,6 +1678,7 @@ RouterModifyTask(Query *originalQuery, Query *query) modifyTask->anchorShardId = shardId; modifyTask->dependedTaskList = NIL; modifyTask->upsertQuery = upsertQuery; + modifyTask->replicationModel = cacheEntry->replicationModel; return modifyTask; } @@ -2017,6 +2022,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo task->taskType = ROUTER_TASK; task->queryString = queryString->data; task->anchorShardId = shardId; + task->replicationModel = REPLICATION_MODEL_INVALID; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; task->relationShardList = relationShardList; diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index bf16c55f6..06fb2ef72 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -506,6 +506,7 @@ OutTask(OUTFUNC_ARGS) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); + WRITE_CHAR_FIELD(replicationModel); WRITE_BOOL_FIELD(insertSelectQuery); WRITE_NODE_FIELD(relationShardList); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index ec9e9955c..e7dc31e60 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -303,6 +303,7 @@ ReadTask(READFUNC_ARGS) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_CHAR_FIELD(replicationModel); READ_BOOL_FIELD(insertSelectQuery); READ_NODE_FIELD(relationShardList); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index aa103be41..981ca8cb9 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -129,6 +129,7 @@ extern void CloseConnection(MultiConnection *connection); extern void CloseConnectionByPGconn(struct pg_conn *pqConn); /* dealing with a connection */ +extern void FinishConnectionListEstablishment(List *multiConnectionList); extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 8ac674b0d..0905a3b15 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -146,6 +146,11 @@ typedef struct MapMergeJob * fetch tasks. We also forward declare the task execution struct here to avoid * including the executor header files. * + * We currently do not take replication model into account for tasks other + * than modifications. When it is set to REPLICATION_MODEL_2PC, the execution + * of the modification task is done with two-phase commit. Set it to + * REPLICATION_MODEL_INVALID if it is not relevant for the task. + * * NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask * in citus_readfuncs to correctly (de)serialize this struct. */ @@ -169,6 +174,7 @@ typedef struct Task uint64 shardId; /* only applies to shard fetch tasks */ TaskExecution *taskExecution; /* used by task tracker executor */ bool upsertQuery; /* only applies to modify tasks */ + char replicationModel; /* only applies to modify tasks */ bool insertSelectQuery; List *relationShardList; /* only applies INSERT/SELECT tasks */ diff --git a/src/include/distributed/pg_dist_partition.h b/src/include/distributed/pg_dist_partition.h index db56d77ab..ab88db311 100644 --- a/src/include/distributed/pg_dist_partition.h +++ b/src/include/distributed/pg_dist_partition.h @@ -55,10 +55,15 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition; #define DISTRIBUTE_BY_NONE 'n' #define REDISTRIBUTE_BY_HASH 'x' -/* valid values for repmodel are 'c' for coordinator and 's' for streaming */ +/* + * Valid values for repmodel are 'c' for coordinator, 's' for streaming + * and 't' for two-phase-commit. We also use an invalid replication model + * ('i') for distinguishing uninitialized variables where necessary. + */ #define REPLICATION_MODEL_COORDINATOR 'c' #define REPLICATION_MODEL_STREAMING 's' #define REPLICATION_MODEL_2PC 't' +#define REPLICATION_MODEL_INVALID 'i' #endif /* PG_DIST_PARTITION_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 477551d68..6dffd0583 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -611,3 +611,407 @@ SELECT * FROM append_researchers; 0 | 0 | John Backus (1 row) +-- we use 2PC for reference tables by default +-- let's add some tests for them +CREATE TABLE reference_modifying_xacts (key int, value int); +SELECT create_reference_table('reference_modifying_xacts'); + create_reference_table +------------------------ + +(1 row) + +-- very basic test, ensure that INSERTs work +INSERT INTO reference_modifying_xacts VALUES (1, 1); +SELECT * FROM reference_modifying_xacts; + key | value +-----+------- + 1 | 1 +(1 row) + +-- now ensure that it works in a transaction as well +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (2, 2); +SELECT * FROM reference_modifying_xacts; + key | value +-----+------- + 1 | 1 + 2 | 2 +(2 rows) + +COMMIT; +-- we should be able to see the insert outside of the transaction as well +SELECT * FROM reference_modifying_xacts; + key | value +-----+------- + 1 | 1 + 2 | 2 +(2 rows) + +-- rollback should also work +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (3, 3); +SELECT * FROM reference_modifying_xacts; + key | value +-----+------- + 1 | 1 + 2 | 2 + 3 | 3 +(3 rows) + +ROLLBACK; +-- see that we've not inserted +SELECT * FROM reference_modifying_xacts; + key | value +-----+------- + 1 | 1 + 2 | 2 +(2 rows) + +-- lets fail on of the workers at before the commit time +\c - - - :worker_1_port +CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.key = 999) THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; +CREATE CONSTRAINT TRIGGER reject_bad_reference +AFTER INSERT ON reference_modifying_xacts_1200006 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); +\c - - - :master_port +\set VERBOSITY terse +-- try without wrapping inside a transaction +INSERT INTO reference_modifying_xacts VALUES (999, 3); +ERROR: illegal value +-- same test within a transaction +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (999, 3); +ERROR: illegal value +COMMIT; +-- lets fail one of the workers at COMMIT time +\c - - - :worker_1_port +DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006; +CREATE CONSTRAINT TRIGGER reject_bad_reference +AFTER INSERT ON reference_modifying_xacts_1200006 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); +\c - - - :master_port +\set VERBOSITY terse +-- try without wrapping inside a transaction +INSERT INTO reference_modifying_xacts VALUES (999, 3); +WARNING: illegal value +ERROR: failure on connection marked as essential: localhost:57637 +-- same test within a transaction +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (999, 3); +COMMIT; +WARNING: illegal value +ERROR: failure on connection marked as essential: localhost:57637 +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'reference_modifying_xacts'::regclass +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + logicalrelid | shardstate | count +---------------------------+------------+------- + reference_modifying_xacts | 1 | 2 +(1 row) + +-- for the time-being drop the constraint +\c - - - :worker_1_port +DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006; +\c - - - :master_port +-- now create a hash distributed table and run tests +-- including both the reference table and the hash +-- distributed table +SET citus.shard_count = 4; +SET citus.shard_replication_factor = 1; +CREATE TABLE hash_modifying_xacts (key int, value int); +SELECT create_distributed_table('hash_modifying_xacts', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- let's try to expand the xact participants +BEGIN; +INSERT INTO hash_modifying_xacts VALUES (1, 1); +INSERT INTO reference_modifying_xacts VALUES (10, 10); +ERROR: no transaction participant matches localhost:57638 +COMMIT; +-- lets fail one of the workers before COMMIT time for the hash table +\c - - - :worker_1_port +CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.key = 997) THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; +CREATE CONSTRAINT TRIGGER reject_bad_hash +AFTER INSERT ON hash_modifying_xacts_1200007 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash(); +\c - - - :master_port +\set VERBOSITY terse +-- the transaction as a whole should fail +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (55, 10); +INSERT INTO hash_modifying_xacts VALUES (997, 1); +WARNING: illegal value +ERROR: could not modify any active placements +COMMIT; +-- ensure that the value didn't go into the reference table +SELECT * FROM reference_modifying_xacts WHERE key = 55; + key | value +-----+------- +(0 rows) + +-- now lets fail on of the workers for the hash distributed table table +-- when there is a reference table involved +\c - - - :worker_1_port +DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007; +-- the trigger is on execution time +CREATE CONSTRAINT TRIGGER reject_bad_hash +AFTER INSERT ON hash_modifying_xacts_1200007 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash(); +\c - - - :master_port +\set VERBOSITY terse +-- the transaction as a whole should fail +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (12, 12); +INSERT INTO hash_modifying_xacts VALUES (997, 1); +COMMIT; +WARNING: illegal value +ERROR: failure on connection marked as essential: localhost:57637 +-- ensure that the values didn't go into the reference table +SELECT * FROM reference_modifying_xacts WHERE key = 12; + key | value +-----+------- +(0 rows) + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR + s.logicalrelid = 'hash_modifying_xacts'::regclass) +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + logicalrelid | shardstate | count +---------------------------+------------+------- + reference_modifying_xacts | 1 | 2 + hash_modifying_xacts | 1 | 4 +(2 rows) + +-- now, fail the insert on reference table +-- and ensure that hash distributed table's +-- change is rollbacked as well +\c - - - :worker_1_port +CREATE CONSTRAINT TRIGGER reject_bad_reference +AFTER INSERT ON reference_modifying_xacts_1200006 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); +\c - - - :master_port +\set VERBOSITY terse +BEGIN; +-- to expand participant to include all worker nodes +INSERT INTO reference_modifying_xacts VALUES (66, 3); +INSERT INTO hash_modifying_xacts VALUES (80, 1); +INSERT INTO reference_modifying_xacts VALUES (999, 3); +ERROR: illegal value +COMMIT; +SELECT * FROM hash_modifying_xacts WHERE key = 80; + key | value +-----+------- +(0 rows) + +SELECT * FROM reference_modifying_xacts WHERE key = 66; + key | value +-----+------- +(0 rows) + +SELECT * FROM reference_modifying_xacts WHERE key = 999; + key | value +-----+------- +(0 rows) + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR + s.logicalrelid = 'hash_modifying_xacts'::regclass) +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + logicalrelid | shardstate | count +---------------------------+------------+------- + reference_modifying_xacts | 1 | 2 + hash_modifying_xacts | 1 | 4 +(2 rows) + +-- now show that all modifications to reference +-- tables are done in 2PC +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO reference_modifying_xacts VALUES (70, 70); +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +-- reset the transactions table +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (71, 71); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +-- create a hash distributed tablw which spans all nodes +SET citus.shard_count = 4; +SET citus.shard_replication_factor = 2; +CREATE TABLE hash_modifying_xacts_second (key int, value int); +SELECT create_distributed_table('hash_modifying_xacts_second', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- reset the transactions table +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +BEGIN; +INSERT INTO hash_modifying_xacts_second VALUES (72, 1); +INSERT INTO reference_modifying_xacts VALUES (72, 3); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +-- reset the transactions table +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +DELETE FROM reference_modifying_xacts; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +-- reset the transactions table +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +UPDATE reference_modifying_xacts SET key = 10; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +-- now to one more type of failure testing +-- in which we'll make the remote host unavailable +-- first create the new user on all nodes +CREATE USER test_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +\c - - - :worker_1_port +CREATE USER test_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +\c - - - :worker_2_port +CREATE USER test_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +-- now connect back to the master with the new user +\c - test_user - :master_port +CREATE TABLE reference_failure_test (key int, value int); +SELECT create_reference_table('reference_failure_test'); + create_reference_table +------------------------ + +(1 row) + +-- ensure that the shard is created for this user +\c - test_user - :worker_1_port +\dt reference_failure_test_1200015 + List of relations + Schema | Name | Type | Owner +--------+--------------------------------+-------+----------- + public | reference_failure_test_1200015 | table | test_user +(1 row) + +-- now connect with the default user, +-- and rename the existing user +\c - :default_user - :worker_1_port +ALTER USER test_user RENAME TO test_user_new; +-- connect back to master and query the reference table + \c - test_user - :master_port +-- should fail since the worker doesn't have test_user anymore +INSERT INTO reference_failure_test VALUES (1, '1'); +WARNING: connection error: localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 +-- the same as the above, but wrapped within a transaction +BEGIN; +INSERT INTO reference_failure_test VALUES (1, '1'); +WARNING: connection error: localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 +COMMIT; +-- show that no data go through the table and shard states are good +SELECT * FROM reference_failure_test; + key | value +-----+------- +(0 rows) + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'reference_failure_test'::regclass +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + logicalrelid | shardstate | count +------------------------+------------+------- + reference_failure_test | 1 | 2 +(1 row) + +-- connect back to the master with the proper user to continue the tests +\c - :default_user - :master_port +DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 201a0da33..525ef77d7 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -256,6 +256,7 @@ sysopen my $fh, "tmp_check/tmp-bin/psql", O_CREAT|O_TRUNC|O_RDWR, 0700 print $fh "#!/bin/bash\n"; print $fh "exec psql "; print $fh "--variable=master_port=$masterPort "; +print $fh "--variable=default_user=$user "; print $fh "--variable=SHOW_CONTEXT=always "; for my $workeroff (0 .. $#workerPorts) { diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 95f7992f2..cc10b385c 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -468,3 +468,297 @@ INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare'); ROLLBACK; SELECT * FROM append_researchers; + +-- we use 2PC for reference tables by default +-- let's add some tests for them +CREATE TABLE reference_modifying_xacts (key int, value int); +SELECT create_reference_table('reference_modifying_xacts'); + +-- very basic test, ensure that INSERTs work +INSERT INTO reference_modifying_xacts VALUES (1, 1); +SELECT * FROM reference_modifying_xacts; + +-- now ensure that it works in a transaction as well +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (2, 2); +SELECT * FROM reference_modifying_xacts; +COMMIT; + +-- we should be able to see the insert outside of the transaction as well +SELECT * FROM reference_modifying_xacts; + +-- rollback should also work +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (3, 3); +SELECT * FROM reference_modifying_xacts; +ROLLBACK; + +-- see that we've not inserted +SELECT * FROM reference_modifying_xacts; + +-- lets fail on of the workers at before the commit time +\c - - - :worker_1_port + +CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.key = 999) THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; + +CREATE CONSTRAINT TRIGGER reject_bad_reference +AFTER INSERT ON reference_modifying_xacts_1200006 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); + +\c - - - :master_port +\set VERBOSITY terse +-- try without wrapping inside a transaction +INSERT INTO reference_modifying_xacts VALUES (999, 3); + +-- same test within a transaction +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (999, 3); +COMMIT; + +-- lets fail one of the workers at COMMIT time +\c - - - :worker_1_port +DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006; + +CREATE CONSTRAINT TRIGGER reject_bad_reference +AFTER INSERT ON reference_modifying_xacts_1200006 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); + +\c - - - :master_port +\set VERBOSITY terse + +-- try without wrapping inside a transaction +INSERT INTO reference_modifying_xacts VALUES (999, 3); + +-- same test within a transaction +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (999, 3); +COMMIT; + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'reference_modifying_xacts'::regclass +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + +-- for the time-being drop the constraint +\c - - - :worker_1_port +DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006; + + +\c - - - :master_port + +-- now create a hash distributed table and run tests +-- including both the reference table and the hash +-- distributed table +SET citus.shard_count = 4; +SET citus.shard_replication_factor = 1; +CREATE TABLE hash_modifying_xacts (key int, value int); +SELECT create_distributed_table('hash_modifying_xacts', 'key'); + +-- let's try to expand the xact participants +BEGIN; +INSERT INTO hash_modifying_xacts VALUES (1, 1); +INSERT INTO reference_modifying_xacts VALUES (10, 10); +COMMIT; + +-- lets fail one of the workers before COMMIT time for the hash table +\c - - - :worker_1_port + +CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.key = 997) THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; + +CREATE CONSTRAINT TRIGGER reject_bad_hash +AFTER INSERT ON hash_modifying_xacts_1200007 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash(); + +\c - - - :master_port +\set VERBOSITY terse + +-- the transaction as a whole should fail +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (55, 10); +INSERT INTO hash_modifying_xacts VALUES (997, 1); +COMMIT; + +-- ensure that the value didn't go into the reference table +SELECT * FROM reference_modifying_xacts WHERE key = 55; + +-- now lets fail on of the workers for the hash distributed table table +-- when there is a reference table involved +\c - - - :worker_1_port +DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007; + +-- the trigger is on execution time +CREATE CONSTRAINT TRIGGER reject_bad_hash +AFTER INSERT ON hash_modifying_xacts_1200007 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash(); + +\c - - - :master_port +\set VERBOSITY terse + +-- the transaction as a whole should fail +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (12, 12); +INSERT INTO hash_modifying_xacts VALUES (997, 1); +COMMIT; + +-- ensure that the values didn't go into the reference table +SELECT * FROM reference_modifying_xacts WHERE key = 12; + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR + s.logicalrelid = 'hash_modifying_xacts'::regclass) +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + +-- now, fail the insert on reference table +-- and ensure that hash distributed table's +-- change is rollbacked as well + +\c - - - :worker_1_port + +CREATE CONSTRAINT TRIGGER reject_bad_reference +AFTER INSERT ON reference_modifying_xacts_1200006 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); + +\c - - - :master_port +\set VERBOSITY terse + +BEGIN; + +-- to expand participant to include all worker nodes +INSERT INTO reference_modifying_xacts VALUES (66, 3); +INSERT INTO hash_modifying_xacts VALUES (80, 1); +INSERT INTO reference_modifying_xacts VALUES (999, 3); +COMMIT; + +SELECT * FROM hash_modifying_xacts WHERE key = 80; +SELECT * FROM reference_modifying_xacts WHERE key = 66; +SELECT * FROM reference_modifying_xacts WHERE key = 999; + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR + s.logicalrelid = 'hash_modifying_xacts'::regclass) +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + +-- now show that all modifications to reference +-- tables are done in 2PC +SELECT recover_prepared_transactions(); + +INSERT INTO reference_modifying_xacts VALUES (70, 70); +SELECT count(*) FROM pg_dist_transaction; + +-- reset the transactions table +SELECT recover_prepared_transactions(); +BEGIN; +INSERT INTO reference_modifying_xacts VALUES (71, 71); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + + +-- create a hash distributed tablw which spans all nodes +SET citus.shard_count = 4; +SET citus.shard_replication_factor = 2; +CREATE TABLE hash_modifying_xacts_second (key int, value int); +SELECT create_distributed_table('hash_modifying_xacts_second', 'key'); + +-- reset the transactions table +SELECT recover_prepared_transactions(); + +BEGIN; +INSERT INTO hash_modifying_xacts_second VALUES (72, 1); +INSERT INTO reference_modifying_xacts VALUES (72, 3); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + +-- reset the transactions table +SELECT recover_prepared_transactions(); +DELETE FROM reference_modifying_xacts; +SELECT count(*) FROM pg_dist_transaction; + +-- reset the transactions table +SELECT recover_prepared_transactions(); +UPDATE reference_modifying_xacts SET key = 10; +SELECT count(*) FROM pg_dist_transaction; + +-- now to one more type of failure testing +-- in which we'll make the remote host unavailable + +-- first create the new user on all nodes +CREATE USER test_user; +\c - - - :worker_1_port +CREATE USER test_user; +\c - - - :worker_2_port +CREATE USER test_user; + +-- now connect back to the master with the new user +\c - test_user - :master_port +CREATE TABLE reference_failure_test (key int, value int); +SELECT create_reference_table('reference_failure_test'); + +-- ensure that the shard is created for this user +\c - test_user - :worker_1_port +\dt reference_failure_test_1200015 + +-- now connect with the default user, +-- and rename the existing user +\c - :default_user - :worker_1_port +ALTER USER test_user RENAME TO test_user_new; + +-- connect back to master and query the reference table + \c - test_user - :master_port +-- should fail since the worker doesn't have test_user anymore +INSERT INTO reference_failure_test VALUES (1, '1'); + +-- the same as the above, but wrapped within a transaction +BEGIN; +INSERT INTO reference_failure_test VALUES (1, '1'); +COMMIT; + +-- show that no data go through the table and shard states are good +SELECT * FROM reference_failure_test; + +-- all placements should be healthy +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'reference_failure_test'::regclass +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + +-- connect back to the master with the proper user to continue the tests +\c - :default_user - :master_port +DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;