Use 2PC for reference table modification

With this commit, we ensure that router executor always uses
2PC for reference table modifications and never mark the placements
of it as INVALID.
pull/1058/head
Onder Kalaci 2016-12-23 09:39:59 +02:00
parent 43f5efecff
commit 6d050fd677
15 changed files with 808 additions and 2 deletions

View File

@ -408,6 +408,27 @@ CloseConnectionByPGconn(PGconn *pqConn)
}
/*
* FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment.
* The function iterates over the multiConnectionList and finishes the connection
* establishment for each multi connection.
*/
void
FinishConnectionListEstablishment(List *multiConnectionList)
{
ListCell *multiConnectionCell = NULL;
foreach(multiConnectionCell, multiConnectionList)
{
MultiConnection *multiConnection = (MultiConnection *) lfirst(
multiConnectionCell);
/* TODO: consider making connection establishment fully in parallel */
FinishConnectionEstablishment(multiConnection);
}
}
/*
* Synchronously finish connection establishment of an individual connection.
*

View File

@ -95,6 +95,7 @@ static HTAB * CreateXactParticipantHash(void);
static void ReacquireMetadataLocks(List *taskList);
static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool expectResults);
static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList);
static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
@ -717,6 +718,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
int64 affectedTupleCount = -1;
bool gotResults = false;
char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{
@ -726,6 +728,22 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
"modifications")));
}
/*
* Firstly ensure that distributed transaction is started. Then, force
* the transaction manager to use 2PC while running the task on the placements.
*/
if (taskRequiresTwoPhaseCommit)
{
BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
/*
* Mark connections for all placements as critical and establish connections
* to all placements at once.
*/
GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList);
}
/*
* We could naturally handle function-based transactions (i.e. those
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
@ -767,6 +785,9 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
continue;
}
/* if we're running a 2PC, the query should fail on error */
failOnError = taskRequiresTwoPhaseCommit;
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
@ -838,12 +859,18 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
ereport(ERROR, (errmsg("could not modify any active placements")));
}
/* otherwise, mark failed placements as inactive: they're stale */
/*
* Otherwise, mark failed placements as inactive: they're stale. Note that
* connections for tasks that require 2PC has already failed the whole transaction
* and there is no way that they're marked stale here.
*/
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement =
(ShardPlacement *) lfirst(failedPlacementCell);
Assert(!taskRequiresTwoPhaseCommit);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
@ -854,6 +881,38 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
}
/*
* GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list,
* starts the connections to the nodes and marks them critical. In the second iteration,
* the connection establishments are finished. Finally, BEGIN commands are sent,
* if necessary.
*/
static void
GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList)
{
ListCell *taskPlacementCell = NULL;
List *multiConnectionList = NIL;
/* in the first iteration start the connections */
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *multiConnection = StartNodeConnection(connectionFlags,
taskPlacement->nodeName,
taskPlacement->nodePort);
MarkRemoteTransactionCritical(multiConnection);
multiConnectionList = lappend(multiConnectionList, multiConnection);
}
FinishConnectionListEstablishment(multiConnectionList);
RemoteTransactionsBeginIfNecessary(multiConnectionList);
}
/*
* ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in a tuple store.

View File

@ -30,6 +30,7 @@
#include "distributed/multi_client_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_protocol.h"
#include "storage/fd.h"
#include "utils/builtins.h"
@ -2764,6 +2765,7 @@ JobCleanupTask(uint64 jobId)
jobCleanupTask = CitusMakeNode(Task);
jobCleanupTask->jobId = jobId;
jobCleanupTask->taskId = JOB_CLEANUP_TASK_ID;
jobCleanupTask->replicationModel = REPLICATION_MODEL_INVALID;
jobCleanupTask->queryString = jobCleanupQuery->data;
return jobCleanupTask;

View File

@ -1129,6 +1129,7 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
task->taskType = SQL_TASK;
task->queryString = pstrdup(vacuumString->data);
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
@ -2097,6 +2098,7 @@ DDLTaskList(Oid relationId, const char *commandString)
task->taskId = taskId++;
task->taskType = SQL_TASK;
task->queryString = applyCommand->data;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
@ -2161,6 +2163,7 @@ ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
task->taskType = SQL_TASK;
task->queryString = applyCommand->data;
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = leftShardId;
task->taskPlacementList = FinalizedShardPlacementList(leftShardId);

View File

@ -201,6 +201,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation
task->taskType = SQL_TASK;
task->queryString = shardQueryString->data;
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);

View File

@ -4024,6 +4024,7 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin
task->jobId = jobId;
task->taskId = taskId;
task->taskType = taskType;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->queryString = queryString;
return task;

View File

@ -326,6 +326,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
RelationRestrictionContext *copiedRestrictionContext =
CopyRelationRestrictionContext(restrictionContext);
@ -460,6 +461,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = upsertQuery;
modifyTask->relationShardList = relationShardList;
modifyTask->replicationModel = cacheEntry->replicationModel;
return modifyTask;
}
@ -1640,9 +1642,11 @@ RouterModifyTask(Query *originalQuery, Query *query)
{
ShardInterval *shardInterval = TargetShardIntervalForModify(query);
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL;
bool upsertQuery = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
@ -1674,6 +1678,7 @@ RouterModifyTask(Query *originalQuery, Query *query)
modifyTask->anchorShardId = shardId;
modifyTask->dependedTaskList = NIL;
modifyTask->upsertQuery = upsertQuery;
modifyTask->replicationModel = cacheEntry->replicationModel;
return modifyTask;
}
@ -2017,6 +2022,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
task->taskType = ROUTER_TASK;
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
task->relationShardList = relationShardList;

View File

@ -506,6 +506,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_CHAR_FIELD(replicationModel);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_NODE_FIELD(relationShardList);
}

View File

@ -303,6 +303,7 @@ ReadTask(READFUNC_ARGS)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_CHAR_FIELD(replicationModel);
READ_BOOL_FIELD(insertSelectQuery);
READ_NODE_FIELD(relationShardList);

View File

@ -129,6 +129,7 @@ extern void CloseConnection(MultiConnection *connection);
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
/* dealing with a connection */
extern void FinishConnectionListEstablishment(List *multiConnectionList);
extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection);

View File

@ -146,6 +146,11 @@ typedef struct MapMergeJob
* fetch tasks. We also forward declare the task execution struct here to avoid
* including the executor header files.
*
* We currently do not take replication model into account for tasks other
* than modifications. When it is set to REPLICATION_MODEL_2PC, the execution
* of the modification task is done with two-phase commit. Set it to
* REPLICATION_MODEL_INVALID if it is not relevant for the task.
*
* NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask
* in citus_readfuncs to correctly (de)serialize this struct.
*/
@ -169,6 +174,7 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
char replicationModel; /* only applies to modify tasks */
bool insertSelectQuery;
List *relationShardList; /* only applies INSERT/SELECT tasks */

View File

@ -55,10 +55,15 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
#define DISTRIBUTE_BY_NONE 'n'
#define REDISTRIBUTE_BY_HASH 'x'
/* valid values for repmodel are 'c' for coordinator and 's' for streaming */
/*
* Valid values for repmodel are 'c' for coordinator, 's' for streaming
* and 't' for two-phase-commit. We also use an invalid replication model
* ('i') for distinguishing uninitialized variables where necessary.
*/
#define REPLICATION_MODEL_COORDINATOR 'c'
#define REPLICATION_MODEL_STREAMING 's'
#define REPLICATION_MODEL_2PC 't'
#define REPLICATION_MODEL_INVALID 'i'
#endif /* PG_DIST_PARTITION_H */

View File

@ -611,3 +611,407 @@ SELECT * FROM append_researchers;
0 | 0 | John Backus
(1 row)
-- we use 2PC for reference tables by default
-- let's add some tests for them
CREATE TABLE reference_modifying_xacts (key int, value int);
SELECT create_reference_table('reference_modifying_xacts');
create_reference_table
------------------------
(1 row)
-- very basic test, ensure that INSERTs work
INSERT INTO reference_modifying_xacts VALUES (1, 1);
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
(1 row)
-- now ensure that it works in a transaction as well
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (2, 2);
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
(2 rows)
COMMIT;
-- we should be able to see the insert outside of the transaction as well
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
(2 rows)
-- rollback should also work
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (3, 3);
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
3 | 3
(3 rows)
ROLLBACK;
-- see that we've not inserted
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
(2 rows)
-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 999) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
ERROR: illegal value
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
ERROR: illegal value
COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
WARNING: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
WARNING: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_modifying_xacts'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
---------------------------+------------+-------
reference_modifying_xacts | 1 | 2
(1 row)
-- for the time-being drop the constraint
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
-- distributed table
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 1;
CREATE TABLE hash_modifying_xacts (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts', 'key');
create_distributed_table
--------------------------
(1 row)
-- let's try to expand the xact participants
BEGIN;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
ERROR: no transaction participant matches localhost:57638
COMMIT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 997) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (55, 10);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
WARNING: illegal value
ERROR: could not modify any active placements
COMMIT;
-- ensure that the value didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 55;
key | value
-----+-------
(0 rows)
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (12, 12);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT;
WARNING: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- ensure that the values didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 12;
key | value
-----+-------
(0 rows)
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
---------------------------+------------+-------
reference_modifying_xacts | 1 | 2
hash_modifying_xacts | 1 | 4
(2 rows)
-- now, fail the insert on reference table
-- and ensure that hash distributed table's
-- change is rollbacked as well
\c - - - :worker_1_port
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
BEGIN;
-- to expand participant to include all worker nodes
INSERT INTO reference_modifying_xacts VALUES (66, 3);
INSERT INTO hash_modifying_xacts VALUES (80, 1);
INSERT INTO reference_modifying_xacts VALUES (999, 3);
ERROR: illegal value
COMMIT;
SELECT * FROM hash_modifying_xacts WHERE key = 80;
key | value
-----+-------
(0 rows)
SELECT * FROM reference_modifying_xacts WHERE key = 66;
key | value
-----+-------
(0 rows)
SELECT * FROM reference_modifying_xacts WHERE key = 999;
key | value
-----+-------
(0 rows)
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
---------------------------+------------+-------
reference_modifying_xacts | 1 | 2
hash_modifying_xacts | 1 | 4
(2 rows)
-- now show that all modifications to reference
-- tables are done in 2PC
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
INSERT INTO reference_modifying_xacts VALUES (70, 70);
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (71, 71);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- create a hash distributed tablw which spans all nodes
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 2;
CREATE TABLE hash_modifying_xacts_second (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts_second', 'key');
create_distributed_table
--------------------------
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
BEGIN;
INSERT INTO hash_modifying_xacts_second VALUES (72, 1);
INSERT INTO reference_modifying_xacts VALUES (72, 3);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
DELETE FROM reference_modifying_xacts;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
UPDATE reference_modifying_xacts SET key = 10;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- now to one more type of failure testing
-- in which we'll make the remote host unavailable
-- first create the new user on all nodes
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
\c - - - :worker_1_port
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
\c - - - :worker_2_port
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
-- now connect back to the master with the new user
\c - test_user - :master_port
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
create_reference_table
------------------------
(1 row)
-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
\dt reference_failure_test_1200015
List of relations
Schema | Name | Type | Owner
--------+--------------------------------+-------+-----------
public | reference_failure_test_1200015 | table | test_user
(1 row)
-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
ALTER USER test_user RENAME TO test_user_new;
-- connect back to master and query the reference table
\c - test_user - :master_port
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637
-- the same as the above, but wrapped within a transaction
BEGIN;
INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637
COMMIT;
-- show that no data go through the table and shard states are good
SELECT * FROM reference_failure_test;
key | value
-----+-------
(0 rows)
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
------------------------+------------+-------
reference_failure_test | 1 | 2
(1 row)
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;

View File

@ -256,6 +256,7 @@ sysopen my $fh, "tmp_check/tmp-bin/psql", O_CREAT|O_TRUNC|O_RDWR, 0700
print $fh "#!/bin/bash\n";
print $fh "exec psql ";
print $fh "--variable=master_port=$masterPort ";
print $fh "--variable=default_user=$user ";
print $fh "--variable=SHOW_CONTEXT=always ";
for my $workeroff (0 .. $#workerPorts)
{

View File

@ -468,3 +468,297 @@ INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare');
ROLLBACK;
SELECT * FROM append_researchers;
-- we use 2PC for reference tables by default
-- let's add some tests for them
CREATE TABLE reference_modifying_xacts (key int, value int);
SELECT create_reference_table('reference_modifying_xacts');
-- very basic test, ensure that INSERTs work
INSERT INTO reference_modifying_xacts VALUES (1, 1);
SELECT * FROM reference_modifying_xacts;
-- now ensure that it works in a transaction as well
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (2, 2);
SELECT * FROM reference_modifying_xacts;
COMMIT;
-- we should be able to see the insert outside of the transaction as well
SELECT * FROM reference_modifying_xacts;
-- rollback should also work
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (3, 3);
SELECT * FROM reference_modifying_xacts;
ROLLBACK;
-- see that we've not inserted
SELECT * FROM reference_modifying_xacts;
-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 999) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_modifying_xacts'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- for the time-being drop the constraint
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
-- distributed table
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 1;
CREATE TABLE hash_modifying_xacts (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts', 'key');
-- let's try to expand the xact participants
BEGIN;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
COMMIT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 997) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (55, 10);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT;
-- ensure that the value didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 55;
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (12, 12);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT;
-- ensure that the values didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 12;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- now, fail the insert on reference table
-- and ensure that hash distributed table's
-- change is rollbacked as well
\c - - - :worker_1_port
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
BEGIN;
-- to expand participant to include all worker nodes
INSERT INTO reference_modifying_xacts VALUES (66, 3);
INSERT INTO hash_modifying_xacts VALUES (80, 1);
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
SELECT * FROM hash_modifying_xacts WHERE key = 80;
SELECT * FROM reference_modifying_xacts WHERE key = 66;
SELECT * FROM reference_modifying_xacts WHERE key = 999;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- now show that all modifications to reference
-- tables are done in 2PC
SELECT recover_prepared_transactions();
INSERT INTO reference_modifying_xacts VALUES (70, 70);
SELECT count(*) FROM pg_dist_transaction;
-- reset the transactions table
SELECT recover_prepared_transactions();
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (71, 71);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
-- create a hash distributed tablw which spans all nodes
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 2;
CREATE TABLE hash_modifying_xacts_second (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts_second', 'key');
-- reset the transactions table
SELECT recover_prepared_transactions();
BEGIN;
INSERT INTO hash_modifying_xacts_second VALUES (72, 1);
INSERT INTO reference_modifying_xacts VALUES (72, 3);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
-- reset the transactions table
SELECT recover_prepared_transactions();
DELETE FROM reference_modifying_xacts;
SELECT count(*) FROM pg_dist_transaction;
-- reset the transactions table
SELECT recover_prepared_transactions();
UPDATE reference_modifying_xacts SET key = 10;
SELECT count(*) FROM pg_dist_transaction;
-- now to one more type of failure testing
-- in which we'll make the remote host unavailable
-- first create the new user on all nodes
CREATE USER test_user;
\c - - - :worker_1_port
CREATE USER test_user;
\c - - - :worker_2_port
CREATE USER test_user;
-- now connect back to the master with the new user
\c - test_user - :master_port
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
\dt reference_failure_test_1200015
-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
ALTER USER test_user RENAME TO test_user_new;
-- connect back to master and query the reference table
\c - test_user - :master_port
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
-- the same as the above, but wrapped within a transaction
BEGIN;
INSERT INTO reference_failure_test VALUES (1, '1');
COMMIT;
-- show that no data go through the table and shard states are good
SELECT * FROM reference_failure_test;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;