Merge pull request #1058 from citusdata/reference_tables_use_2pc_and_add_to_task

Allow reference tables to use 2PC for all modifications
pull/1091/head
Önder Kalacı 2017-01-04 12:57:01 +02:00 committed by GitHub
commit 2e3e801768
15 changed files with 808 additions and 2 deletions

View File

@ -408,6 +408,27 @@ CloseConnectionByPGconn(PGconn *pqConn)
}
/*
* FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment.
* The function iterates over the multiConnectionList and finishes the connection
* establishment for each multi connection.
*/
void
FinishConnectionListEstablishment(List *multiConnectionList)
{
ListCell *multiConnectionCell = NULL;
foreach(multiConnectionCell, multiConnectionList)
{
MultiConnection *multiConnection = (MultiConnection *) lfirst(
multiConnectionCell);
/* TODO: consider making connection establishment fully in parallel */
FinishConnectionEstablishment(multiConnection);
}
}
/*
* Synchronously finish connection establishment of an individual connection.
*

View File

@ -95,6 +95,7 @@ static HTAB * CreateXactParticipantHash(void);
static void ReacquireMetadataLocks(List *taskList);
static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool expectResults);
static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList);
static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
@ -717,6 +718,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
int64 affectedTupleCount = -1;
bool gotResults = false;
char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{
@ -726,6 +728,22 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
"modifications")));
}
/*
* Firstly ensure that distributed transaction is started. Then, force
* the transaction manager to use 2PC while running the task on the placements.
*/
if (taskRequiresTwoPhaseCommit)
{
BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
/*
* Mark connections for all placements as critical and establish connections
* to all placements at once.
*/
GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList);
}
/*
* We could naturally handle function-based transactions (i.e. those
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
@ -767,6 +785,9 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
continue;
}
/* if we're running a 2PC, the query should fail on error */
failOnError = taskRequiresTwoPhaseCommit;
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
@ -838,12 +859,18 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
ereport(ERROR, (errmsg("could not modify any active placements")));
}
/* otherwise, mark failed placements as inactive: they're stale */
/*
* Otherwise, mark failed placements as inactive: they're stale. Note that
* connections for tasks that require 2PC has already failed the whole transaction
* and there is no way that they're marked stale here.
*/
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement =
(ShardPlacement *) lfirst(failedPlacementCell);
Assert(!taskRequiresTwoPhaseCommit);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
@ -854,6 +881,38 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
}
/*
* GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list,
* starts the connections to the nodes and marks them critical. In the second iteration,
* the connection establishments are finished. Finally, BEGIN commands are sent,
* if necessary.
*/
static void
GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList)
{
ListCell *taskPlacementCell = NULL;
List *multiConnectionList = NIL;
/* in the first iteration start the connections */
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *multiConnection = StartNodeConnection(connectionFlags,
taskPlacement->nodeName,
taskPlacement->nodePort);
MarkRemoteTransactionCritical(multiConnection);
multiConnectionList = lappend(multiConnectionList, multiConnection);
}
FinishConnectionListEstablishment(multiConnectionList);
RemoteTransactionsBeginIfNecessary(multiConnectionList);
}
/*
* ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in a tuple store.

View File

@ -30,6 +30,7 @@
#include "distributed/multi_client_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_protocol.h"
#include "storage/fd.h"
#include "utils/builtins.h"
@ -2764,6 +2765,7 @@ JobCleanupTask(uint64 jobId)
jobCleanupTask = CitusMakeNode(Task);
jobCleanupTask->jobId = jobId;
jobCleanupTask->taskId = JOB_CLEANUP_TASK_ID;
jobCleanupTask->replicationModel = REPLICATION_MODEL_INVALID;
jobCleanupTask->queryString = jobCleanupQuery->data;
return jobCleanupTask;

View File

@ -1129,6 +1129,7 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
task->taskType = SQL_TASK;
task->queryString = pstrdup(vacuumString->data);
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
@ -2097,6 +2098,7 @@ DDLTaskList(Oid relationId, const char *commandString)
task->taskId = taskId++;
task->taskType = SQL_TASK;
task->queryString = applyCommand->data;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
@ -2161,6 +2163,7 @@ ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
task->taskType = SQL_TASK;
task->queryString = applyCommand->data;
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = leftShardId;
task->taskPlacementList = FinalizedShardPlacementList(leftShardId);

View File

@ -201,6 +201,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation
task->taskType = SQL_TASK;
task->queryString = shardQueryString->data;
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);

View File

@ -4024,6 +4024,7 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin
task->jobId = jobId;
task->taskId = taskId;
task->taskType = taskType;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->queryString = queryString;
return task;

View File

@ -326,6 +326,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
RelationRestrictionContext *copiedRestrictionContext =
CopyRelationRestrictionContext(restrictionContext);
@ -460,6 +461,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = upsertQuery;
modifyTask->relationShardList = relationShardList;
modifyTask->replicationModel = cacheEntry->replicationModel;
return modifyTask;
}
@ -1640,9 +1642,11 @@ RouterModifyTask(Query *originalQuery, Query *query)
{
ShardInterval *shardInterval = TargetShardIntervalForModify(query);
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL;
bool upsertQuery = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
@ -1674,6 +1678,7 @@ RouterModifyTask(Query *originalQuery, Query *query)
modifyTask->anchorShardId = shardId;
modifyTask->dependedTaskList = NIL;
modifyTask->upsertQuery = upsertQuery;
modifyTask->replicationModel = cacheEntry->replicationModel;
return modifyTask;
}
@ -2017,6 +2022,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
task->taskType = ROUTER_TASK;
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
task->relationShardList = relationShardList;

View File

@ -506,6 +506,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_CHAR_FIELD(replicationModel);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_NODE_FIELD(relationShardList);
}

View File

@ -303,6 +303,7 @@ ReadTask(READFUNC_ARGS)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_CHAR_FIELD(replicationModel);
READ_BOOL_FIELD(insertSelectQuery);
READ_NODE_FIELD(relationShardList);

View File

@ -129,6 +129,7 @@ extern void CloseConnection(MultiConnection *connection);
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
/* dealing with a connection */
extern void FinishConnectionListEstablishment(List *multiConnectionList);
extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection);

View File

@ -146,6 +146,11 @@ typedef struct MapMergeJob
* fetch tasks. We also forward declare the task execution struct here to avoid
* including the executor header files.
*
* We currently do not take replication model into account for tasks other
* than modifications. When it is set to REPLICATION_MODEL_2PC, the execution
* of the modification task is done with two-phase commit. Set it to
* REPLICATION_MODEL_INVALID if it is not relevant for the task.
*
* NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask
* in citus_readfuncs to correctly (de)serialize this struct.
*/
@ -169,6 +174,7 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
char replicationModel; /* only applies to modify tasks */
bool insertSelectQuery;
List *relationShardList; /* only applies INSERT/SELECT tasks */

View File

@ -55,10 +55,15 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
#define DISTRIBUTE_BY_NONE 'n'
#define REDISTRIBUTE_BY_HASH 'x'
/* valid values for repmodel are 'c' for coordinator and 's' for streaming */
/*
* Valid values for repmodel are 'c' for coordinator, 's' for streaming
* and 't' for two-phase-commit. We also use an invalid replication model
* ('i') for distinguishing uninitialized variables where necessary.
*/
#define REPLICATION_MODEL_COORDINATOR 'c'
#define REPLICATION_MODEL_STREAMING 's'
#define REPLICATION_MODEL_2PC 't'
#define REPLICATION_MODEL_INVALID 'i'
#endif /* PG_DIST_PARTITION_H */

View File

@ -611,3 +611,407 @@ SELECT * FROM append_researchers;
0 | 0 | John Backus
(1 row)
-- we use 2PC for reference tables by default
-- let's add some tests for them
CREATE TABLE reference_modifying_xacts (key int, value int);
SELECT create_reference_table('reference_modifying_xacts');
create_reference_table
------------------------
(1 row)
-- very basic test, ensure that INSERTs work
INSERT INTO reference_modifying_xacts VALUES (1, 1);
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
(1 row)
-- now ensure that it works in a transaction as well
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (2, 2);
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
(2 rows)
COMMIT;
-- we should be able to see the insert outside of the transaction as well
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
(2 rows)
-- rollback should also work
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (3, 3);
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
3 | 3
(3 rows)
ROLLBACK;
-- see that we've not inserted
SELECT * FROM reference_modifying_xacts;
key | value
-----+-------
1 | 1
2 | 2
(2 rows)
-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 999) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
ERROR: illegal value
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
ERROR: illegal value
COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
WARNING: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
WARNING: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_modifying_xacts'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
---------------------------+------------+-------
reference_modifying_xacts | 1 | 2
(1 row)
-- for the time-being drop the constraint
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
-- distributed table
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 1;
CREATE TABLE hash_modifying_xacts (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts', 'key');
create_distributed_table
--------------------------
(1 row)
-- let's try to expand the xact participants
BEGIN;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
ERROR: no transaction participant matches localhost:57638
COMMIT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 997) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (55, 10);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
WARNING: illegal value
ERROR: could not modify any active placements
COMMIT;
-- ensure that the value didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 55;
key | value
-----+-------
(0 rows)
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (12, 12);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT;
WARNING: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- ensure that the values didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 12;
key | value
-----+-------
(0 rows)
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
---------------------------+------------+-------
reference_modifying_xacts | 1 | 2
hash_modifying_xacts | 1 | 4
(2 rows)
-- now, fail the insert on reference table
-- and ensure that hash distributed table's
-- change is rollbacked as well
\c - - - :worker_1_port
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
BEGIN;
-- to expand participant to include all worker nodes
INSERT INTO reference_modifying_xacts VALUES (66, 3);
INSERT INTO hash_modifying_xacts VALUES (80, 1);
INSERT INTO reference_modifying_xacts VALUES (999, 3);
ERROR: illegal value
COMMIT;
SELECT * FROM hash_modifying_xacts WHERE key = 80;
key | value
-----+-------
(0 rows)
SELECT * FROM reference_modifying_xacts WHERE key = 66;
key | value
-----+-------
(0 rows)
SELECT * FROM reference_modifying_xacts WHERE key = 999;
key | value
-----+-------
(0 rows)
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
---------------------------+------------+-------
reference_modifying_xacts | 1 | 2
hash_modifying_xacts | 1 | 4
(2 rows)
-- now show that all modifications to reference
-- tables are done in 2PC
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
INSERT INTO reference_modifying_xacts VALUES (70, 70);
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (71, 71);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- create a hash distributed tablw which spans all nodes
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 2;
CREATE TABLE hash_modifying_xacts_second (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts_second', 'key');
create_distributed_table
--------------------------
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
BEGIN;
INSERT INTO hash_modifying_xacts_second VALUES (72, 1);
INSERT INTO reference_modifying_xacts VALUES (72, 3);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
DELETE FROM reference_modifying_xacts;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- reset the transactions table
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
UPDATE reference_modifying_xacts SET key = 10;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
-- now to one more type of failure testing
-- in which we'll make the remote host unavailable
-- first create the new user on all nodes
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
\c - - - :worker_1_port
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
\c - - - :worker_2_port
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
-- now connect back to the master with the new user
\c - test_user - :master_port
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
create_reference_table
------------------------
(1 row)
-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
\dt reference_failure_test_1200015
List of relations
Schema | Name | Type | Owner
--------+--------------------------------+-------+-----------
public | reference_failure_test_1200015 | table | test_user
(1 row)
-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
ALTER USER test_user RENAME TO test_user_new;
-- connect back to master and query the reference table
\c - test_user - :master_port
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637
-- the same as the above, but wrapped within a transaction
BEGIN;
INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637
COMMIT;
-- show that no data go through the table and shard states are good
SELECT * FROM reference_failure_test;
key | value
-----+-------
(0 rows)
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
------------------------+------------+-------
reference_failure_test | 1 | 2
(1 row)
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;

View File

@ -256,6 +256,7 @@ sysopen my $fh, "tmp_check/tmp-bin/psql", O_CREAT|O_TRUNC|O_RDWR, 0700
print $fh "#!/bin/bash\n";
print $fh "exec psql ";
print $fh "--variable=master_port=$masterPort ";
print $fh "--variable=default_user=$user ";
print $fh "--variable=SHOW_CONTEXT=always ";
for my $workeroff (0 .. $#workerPorts)
{

View File

@ -468,3 +468,297 @@ INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare');
ROLLBACK;
SELECT * FROM append_researchers;
-- we use 2PC for reference tables by default
-- let's add some tests for them
CREATE TABLE reference_modifying_xacts (key int, value int);
SELECT create_reference_table('reference_modifying_xacts');
-- very basic test, ensure that INSERTs work
INSERT INTO reference_modifying_xacts VALUES (1, 1);
SELECT * FROM reference_modifying_xacts;
-- now ensure that it works in a transaction as well
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (2, 2);
SELECT * FROM reference_modifying_xacts;
COMMIT;
-- we should be able to see the insert outside of the transaction as well
SELECT * FROM reference_modifying_xacts;
-- rollback should also work
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (3, 3);
SELECT * FROM reference_modifying_xacts;
ROLLBACK;
-- see that we've not inserted
SELECT * FROM reference_modifying_xacts;
-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 999) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
-- same test within a transaction
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_modifying_xacts'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- for the time-being drop the constraint
\c - - - :worker_1_port
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
-- distributed table
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 1;
CREATE TABLE hash_modifying_xacts (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts', 'key');
-- let's try to expand the xact participants
BEGIN;
INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10);
COMMIT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
IF (NEW.key = 997) THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (55, 10);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT;
-- ensure that the value didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 55;
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
CREATE CONSTRAINT TRIGGER reject_bad_hash
AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (12, 12);
INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT;
-- ensure that the values didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 12;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- now, fail the insert on reference table
-- and ensure that hash distributed table's
-- change is rollbacked as well
\c - - - :worker_1_port
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
\set VERBOSITY terse
BEGIN;
-- to expand participant to include all worker nodes
INSERT INTO reference_modifying_xacts VALUES (66, 3);
INSERT INTO hash_modifying_xacts VALUES (80, 1);
INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT;
SELECT * FROM hash_modifying_xacts WHERE key = 80;
SELECT * FROM reference_modifying_xacts WHERE key = 66;
SELECT * FROM reference_modifying_xacts WHERE key = 999;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR
s.logicalrelid = 'hash_modifying_xacts'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- now show that all modifications to reference
-- tables are done in 2PC
SELECT recover_prepared_transactions();
INSERT INTO reference_modifying_xacts VALUES (70, 70);
SELECT count(*) FROM pg_dist_transaction;
-- reset the transactions table
SELECT recover_prepared_transactions();
BEGIN;
INSERT INTO reference_modifying_xacts VALUES (71, 71);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
-- create a hash distributed tablw which spans all nodes
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 2;
CREATE TABLE hash_modifying_xacts_second (key int, value int);
SELECT create_distributed_table('hash_modifying_xacts_second', 'key');
-- reset the transactions table
SELECT recover_prepared_transactions();
BEGIN;
INSERT INTO hash_modifying_xacts_second VALUES (72, 1);
INSERT INTO reference_modifying_xacts VALUES (72, 3);
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
-- reset the transactions table
SELECT recover_prepared_transactions();
DELETE FROM reference_modifying_xacts;
SELECT count(*) FROM pg_dist_transaction;
-- reset the transactions table
SELECT recover_prepared_transactions();
UPDATE reference_modifying_xacts SET key = 10;
SELECT count(*) FROM pg_dist_transaction;
-- now to one more type of failure testing
-- in which we'll make the remote host unavailable
-- first create the new user on all nodes
CREATE USER test_user;
\c - - - :worker_1_port
CREATE USER test_user;
\c - - - :worker_2_port
CREATE USER test_user;
-- now connect back to the master with the new user
\c - test_user - :master_port
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
\dt reference_failure_test_1200015
-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
ALTER USER test_user RENAME TO test_user_new;
-- connect back to master and query the reference table
\c - test_user - :master_port
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
-- the same as the above, but wrapped within a transaction
BEGIN;
INSERT INTO reference_failure_test VALUES (1, '1');
COMMIT;
-- show that no data go through the table and shard states are good
SELECT * FROM reference_failure_test;
-- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;