Refactor CheckShardPlacements

- Break CheckShardPlacements into multiple functions (The most important
  is MarkFailedShardPlacements), so that we can get rid of the global
  CoordinatedTransactionUses2PC.
- Call MarkFailedShardPlacements in the router executor, so we mark
  shards as invalid and stop using them while inside transaction blocks.
pull/1174/head
Brian Cloutier 2017-01-24 14:35:00 +03:00 committed by Brian Cloutier
parent c44ae463ae
commit 1173f3f225
10 changed files with 183 additions and 53 deletions

View File

@ -537,7 +537,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* mark failed placements as inactive */ /* mark failed placements as inactive */
CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); MarkFailedShardPlacements();
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();

View File

@ -187,8 +187,7 @@ static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference); ConnectionReference *connectionReference);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement); ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit, static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry);
bool using2PC);
static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize); static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize); static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
@ -723,38 +722,83 @@ ResetPlacementConnectionManagement(void)
/* /*
* CheckForFailedPlacements checks which placements have to be marked as * MarkFailedShardPlacements looks through every connection in the connection shard hash
* invalid, and/or whether sufficiently many placements have failed to abort * and marks the placements associated with failed connections invalid.
* the entire coordinated transaction.
* *
* This will usually be called twice. Once before the remote commit is done, * Every shard must have at least one placement connection which did not fail. If all
* and once after. This is so we can abort before executing remote commits, * modifying connections for a shard failed then the transaction will be aborted.
* and so we can handle remote transactions that failed during commit.
* *
* When preCommit or using2PC is true, failures on transactions marked as * This will be called just before commit, so we can abort before executing remote
* critical will abort the entire coordinated transaction. If not we can't * commits. It should also be called after modification statements, to ensure that we
* roll back, because some remote transactions might have already committed. * don't run future statements against placements which are not up to date.
*/ */
void void
CheckForFailedPlacements(bool preCommit, bool using2PC) MarkFailedShardPlacements()
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
if (!CheckShardPlacements(shardEntry))
{
ereport(ERROR,
(errmsg("could not make changes to shard " INT64_FORMAT
" on any node",
shardEntry->key.shardId)));
}
}
}
/*
* PostCommitMarkFailedShardPlacements marks placements invalid and checks whether
* sufficiently many placements have failed to abort the entire coordinated
* transaction.
*
* This will be called just after a coordinated commit so we can handle remote
* transactions which failed during commit.
*
* When using2PC is set as least one placement must succeed per shard. If all placements
* fail for a shard the entire transaction is aborted. If using2PC is not set then a only
* a warning will be emitted; we cannot abort because some remote transactions might have
* already been committed.
*/
void
PostCommitMarkFailedShardPlacements(bool using2PC)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL; ConnectionShardHashEntry *shardEntry = NULL;
int successes = 0; int successes = 0;
int attempts = 0; int attempts = 0;
int elevel = using2PC ? ERROR : WARNING;
hash_seq_init(&status, ConnectionShardHash); hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{ {
attempts++; attempts++;
if (CheckShardPlacements(shardEntry, preCommit, using2PC)) if (CheckShardPlacements(shardEntry))
{ {
successes++; successes++;
} }
else
{
/*
* Only error out if we're using 2PC. If we're not using 2PC we can't error
* out otherwise we can end up with a state where some shard modifications
* have already committed successfully.
*/
ereport(elevel,
(errmsg("could not commit transaction for shard " INT64_FORMAT
" on any active node",
shardEntry->key.shardId)));
}
} }
/* /*
* If no shards could be modified at all, error out. Doesn't matter if * If no shards could be modified at all, error out. Doesn't matter whether
* we're post-commit - there's nothing to invalidate. * we're post-commit - there's nothing to invalidate.
*/ */
if (attempts > 0 && successes == 0) if (attempts > 0 && successes == 0)
@ -769,8 +813,7 @@ CheckForFailedPlacements(bool preCommit, bool using2PC)
* performs the per-shard work. * performs the per-shard work.
*/ */
static bool static bool
CheckShardPlacements(ConnectionShardHashEntry *shardEntry, CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
bool preCommit, bool using2PC)
{ {
int failures = 0; int failures = 0;
int successes = 0; int successes = 0;
@ -804,28 +847,6 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry,
if (failures > 0 && successes == 0) if (failures > 0 && successes == 0)
{ {
int elevel = 0;
/*
* Only error out if we're pre-commit or using 2PC. Can't error
* otherwise as we can end up with a state where some shard
* modifications have already committed successfully. If no
* modifications at all succeed, CheckForFailedPlacements() will error
* out. This sucks.
*/
if (preCommit || using2PC)
{
elevel = ERROR;
}
else
{
elevel = WARNING;
}
ereport(elevel,
(errmsg("could not commit transaction for shard " INT64_FORMAT
" on any active node",
shardEntry->key.shardId)));
return false; return false;
} }

View File

@ -792,6 +792,9 @@ ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task,
ereport(ERROR, (errmsg("could not modify any active placements"))); ereport(ERROR, (errmsg("could not modify any active placements")));
} }
/* if some placements failed, ensure future statements don't access them */
MarkFailedShardPlacements();
executorState->es_processed = affectedTupleCount; executorState->es_processed = affectedTupleCount;
if (IsTransactionBlock()) if (IsTransactionBlock())

View File

@ -289,11 +289,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
} }
} }
/* MarkFailedShardPlacements();
* Abort if all placements failed, mark placements invalid if only some failed. By
* doing this UpdateShardStatistics never works on failed placements.
*/
CheckForFailedPlacements(true, CoordinatedTransactionUses2PC);
/* update shard statistics and get new shard size */ /* update shard statistics and get new shard size */
newShardSize = UpdateShardStatistics(shardId); newShardSize = UpdateShardStatistics(shardId);

View File

@ -244,10 +244,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* /*
* Check whether the coordinated transaction is in a state we want * Check whether the coordinated transaction is in a state we want
* to persist, or whether we want to error out. This handles the * to persist, or whether we want to error out. This handles the
* case that iteratively executed commands marked all placements * case where iteratively executed commands marked all placements
* as invalid. * as invalid.
*/ */
CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); MarkFailedShardPlacements();
if (CoordinatedTransactionUses2PC) if (CoordinatedTransactionUses2PC)
{ {
@ -266,11 +266,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
} }
/* /*
*
* Check again whether shards/placement successfully * Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time. * committed. This handles failure at COMMIT/PREPARE time.
*/ */
CheckForFailedPlacements(false, CoordinatedTransactionUses2PC); PostCommitMarkFailedShardPlacements(CoordinatedTransactionUses2PC);
} }
break; break;

View File

@ -23,7 +23,8 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
const char *userName); const char *userName);
extern void ResetPlacementConnectionManagement(void); extern void ResetPlacementConnectionManagement(void);
extern void CheckForFailedPlacements(bool preCommit, bool using2PC); extern void MarkFailedShardPlacements(void);
extern void PostCommitMarkFailedShardPlacements(bool using2PC);
extern void CloseShardPlacementAssociation(struct MultiConnection *connection); extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
extern void ResetShardPlacementAssociation(struct MultiConnection *connection); extern void ResetShardPlacementAssociation(struct MultiConnection *connection);

View File

@ -66,9 +66,6 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
/* list of connections that are part of the current coordinated transaction */ /* list of connections that are part of the current coordinated transaction */
extern dlist_head InProgressTransactions; extern dlist_head InProgressTransactions;
/* whether we've been asked to use 2PC (by calling CoordinatedTransactionUse2PC()) */
extern bool CoordinatedTransactionUses2PC;
/* /*
* Coordinated transaction management. * Coordinated transaction management.
*/ */

View File

@ -124,7 +124,7 @@ EXCEPTION
END $$; END $$;
NOTICE: caught not_null_violation NOTICE: caught not_null_violation
COMMIT; COMMIT;
ERROR: could not commit transaction for shard 1220100 on any active node ERROR: could not make changes to shard 1220100 on any node
\set VERBOSITY default \set VERBOSITY default
-- should be valid to edit labs_mx after researchers_mx... -- should be valid to edit labs_mx after researchers_mx...
BEGIN; BEGIN;

View File

@ -2041,6 +2041,81 @@ DEBUG: Plan is router executable
(6 rows) (6 rows)
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
-- test that a connection failure marks placements invalid
SET citus.shard_replication_factor TO 2;
CREATE TABLE failure_test (a int, b int);
SELECT master_create_distributed_table('failure_test', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('failure_test', 2);
master_create_worker_shards
-----------------------------
(1 row)
CREATE USER router_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
\c - - - :worker_1_port
CREATE USER router_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
\c - router_user - :master_port
-- first test that it is marked invalid inside a transaction block
-- we will fail to connect to worker 2, since the user does not exist
BEGIN;
INSERT INTO failure_test VALUES (1, 1);
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
840008 | 1 | localhost | 57637
840008 | 3 | localhost | 57638
840009 | 1 | localhost | 57638
840009 | 1 | localhost | 57637
(4 rows)
ROLLBACK;
INSERT INTO failure_test VALUES (2, 1);
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
840008 | 1 | localhost | 57637
840008 | 1 | localhost | 57638
840009 | 3 | localhost | 57638
840009 | 1 | localhost | 57637
(4 rows)
\c - postgres - :worker_1_port
DROP OWNED BY router_user;
DROP USER router_user;
\c - - - :master_port
DROP OWNED BY router_user;
DROP USER router_user;
DROP TABLE failure_test;
DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count(); DROP FUNCTION author_articles_id_word_count();
DROP MATERIALIZED VIEW mv_articles_hash; DROP MATERIALIZED VIEW mv_articles_hash;

View File

@ -920,6 +920,44 @@ SELECT id
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
-- test that a connection failure marks placements invalid
SET citus.shard_replication_factor TO 2;
CREATE TABLE failure_test (a int, b int);
SELECT master_create_distributed_table('failure_test', 'a', 'hash');
SELECT master_create_worker_shards('failure_test', 2);
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
\c - - - :worker_1_port
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
\c - router_user - :master_port
-- first test that it is marked invalid inside a transaction block
-- we will fail to connect to worker 2, since the user does not exist
BEGIN;
INSERT INTO failure_test VALUES (1, 1);
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
ROLLBACK;
INSERT INTO failure_test VALUES (2, 1);
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
\c - postgres - :worker_1_port
DROP OWNED BY router_user;
DROP USER router_user;
\c - - - :master_port
DROP OWNED BY router_user;
DROP USER router_user;
DROP TABLE failure_test;
DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count(); DROP FUNCTION author_articles_id_word_count();