From 1173f3f225eb03f9cfceee79845d8e5930918734 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Tue, 24 Jan 2017 14:35:00 +0300 Subject: [PATCH] Refactor CheckShardPlacements - Break CheckShardPlacements into multiple functions (The most important is MarkFailedShardPlacements), so that we can get rid of the global CoordinatedTransactionUses2PC. - Call MarkFailedShardPlacements in the router executor, so we mark shards as invalid and stop using them while inside transaction blocks. --- src/backend/distributed/commands/multi_copy.c | 2 +- .../connection/placement_connection.c | 97 +++++++++++-------- .../executor/multi_router_executor.c | 3 + .../master/master_stage_protocol.c | 6 +- .../transaction/transaction_management.c | 7 +- .../distributed/placement_connection.h | 3 +- .../distributed/transaction_management.h | 3 - .../expected/multi_mx_modifying_xacts.out | 2 +- .../regress/expected/multi_router_planner.out | 75 ++++++++++++++ src/test/regress/sql/multi_router_planner.sql | 38 ++++++++ 10 files changed, 183 insertions(+), 53 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b69a06799..bfc73c676 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -537,7 +537,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) heap_close(distributedRelation, NoLock); /* mark failed placements as inactive */ - CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); + MarkFailedShardPlacements(); CHECK_FOR_INTERRUPTS(); diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 7df453945..2cf2d0f83 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -187,8 +187,7 @@ static bool CanUseExistingConnection(uint32 flags, const char *userName, ConnectionReference *connectionReference); static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, ShardPlacement *placement); -static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit, - bool using2PC); +static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry); static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize); static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize); @@ -723,38 +722,83 @@ ResetPlacementConnectionManagement(void) /* - * CheckForFailedPlacements checks which placements have to be marked as - * invalid, and/or whether sufficiently many placements have failed to abort - * the entire coordinated transaction. + * MarkFailedShardPlacements looks through every connection in the connection shard hash + * and marks the placements associated with failed connections invalid. * - * This will usually be called twice. Once before the remote commit is done, - * and once after. This is so we can abort before executing remote commits, - * and so we can handle remote transactions that failed during commit. + * Every shard must have at least one placement connection which did not fail. If all + * modifying connections for a shard failed then the transaction will be aborted. * - * When preCommit or using2PC is true, failures on transactions marked as - * critical will abort the entire coordinated transaction. If not we can't - * roll back, because some remote transactions might have already committed. + * This will be called just before commit, so we can abort before executing remote + * commits. It should also be called after modification statements, to ensure that we + * don't run future statements against placements which are not up to date. */ void -CheckForFailedPlacements(bool preCommit, bool using2PC) +MarkFailedShardPlacements() +{ + HASH_SEQ_STATUS status; + ConnectionShardHashEntry *shardEntry = NULL; + + hash_seq_init(&status, ConnectionShardHash); + while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) + { + if (!CheckShardPlacements(shardEntry)) + { + ereport(ERROR, + (errmsg("could not make changes to shard " INT64_FORMAT + " on any node", + shardEntry->key.shardId))); + } + } +} + + +/* + * PostCommitMarkFailedShardPlacements marks placements invalid and checks whether + * sufficiently many placements have failed to abort the entire coordinated + * transaction. + * + * This will be called just after a coordinated commit so we can handle remote + * transactions which failed during commit. + * + * When using2PC is set as least one placement must succeed per shard. If all placements + * fail for a shard the entire transaction is aborted. If using2PC is not set then a only + * a warning will be emitted; we cannot abort because some remote transactions might have + * already been committed. + */ +void +PostCommitMarkFailedShardPlacements(bool using2PC) { HASH_SEQ_STATUS status; ConnectionShardHashEntry *shardEntry = NULL; int successes = 0; int attempts = 0; + int elevel = using2PC ? ERROR : WARNING; + hash_seq_init(&status, ConnectionShardHash); while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) { attempts++; - if (CheckShardPlacements(shardEntry, preCommit, using2PC)) + if (CheckShardPlacements(shardEntry)) { successes++; } + else + { + /* + * Only error out if we're using 2PC. If we're not using 2PC we can't error + * out otherwise we can end up with a state where some shard modifications + * have already committed successfully. + */ + ereport(elevel, + (errmsg("could not commit transaction for shard " INT64_FORMAT + " on any active node", + shardEntry->key.shardId))); + } } /* - * If no shards could be modified at all, error out. Doesn't matter if + * If no shards could be modified at all, error out. Doesn't matter whether * we're post-commit - there's nothing to invalidate. */ if (attempts > 0 && successes == 0) @@ -769,8 +813,7 @@ CheckForFailedPlacements(bool preCommit, bool using2PC) * performs the per-shard work. */ static bool -CheckShardPlacements(ConnectionShardHashEntry *shardEntry, - bool preCommit, bool using2PC) +CheckShardPlacements(ConnectionShardHashEntry *shardEntry) { int failures = 0; int successes = 0; @@ -804,28 +847,6 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry, if (failures > 0 && successes == 0) { - int elevel = 0; - - /* - * Only error out if we're pre-commit or using 2PC. Can't error - * otherwise as we can end up with a state where some shard - * modifications have already committed successfully. If no - * modifications at all succeed, CheckForFailedPlacements() will error - * out. This sucks. - */ - if (preCommit || using2PC) - { - elevel = ERROR; - } - else - { - elevel = WARNING; - } - - ereport(elevel, - (errmsg("could not commit transaction for shard " INT64_FORMAT - " on any active node", - shardEntry->key.shardId))); return false; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ee5b579a2..9b5a93a23 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -792,6 +792,9 @@ ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task, ereport(ERROR, (errmsg("could not modify any active placements"))); } + /* if some placements failed, ensure future statements don't access them */ + MarkFailedShardPlacements(); + executorState->es_processed = affectedTupleCount; if (IsTransactionBlock()) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index de8e845dd..f647ebec2 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -289,11 +289,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) } } - /* - * Abort if all placements failed, mark placements invalid if only some failed. By - * doing this UpdateShardStatistics never works on failed placements. - */ - CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); + MarkFailedShardPlacements(); /* update shard statistics and get new shard size */ newShardSize = UpdateShardStatistics(shardId); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 9c22858ac..a0d14d9b0 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -244,10 +244,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* * Check whether the coordinated transaction is in a state we want * to persist, or whether we want to error out. This handles the - * case that iteratively executed commands marked all placements + * case where iteratively executed commands marked all placements * as invalid. */ - CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); + MarkFailedShardPlacements(); if (CoordinatedTransactionUses2PC) { @@ -266,11 +266,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) } /* - * * Check again whether shards/placement successfully * committed. This handles failure at COMMIT/PREPARE time. */ - CheckForFailedPlacements(false, CoordinatedTransactionUses2PC); + PostCommitMarkFailedShardPlacements(CoordinatedTransactionUses2PC); } break; diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index c76d838aa..493e74c52 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -23,7 +23,8 @@ extern MultiConnection * StartPlacementConnection(uint32 flags, const char *userName); extern void ResetPlacementConnectionManagement(void); -extern void CheckForFailedPlacements(bool preCommit, bool using2PC); +extern void MarkFailedShardPlacements(void); +extern void PostCommitMarkFailedShardPlacements(bool using2PC); extern void CloseShardPlacementAssociation(struct MultiConnection *connection); extern void ResetShardPlacementAssociation(struct MultiConnection *connection); diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 3f600f064..dd0c691ed 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -66,9 +66,6 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState; /* list of connections that are part of the current coordinated transaction */ extern dlist_head InProgressTransactions; -/* whether we've been asked to use 2PC (by calling CoordinatedTransactionUse2PC()) */ -extern bool CoordinatedTransactionUses2PC; - /* * Coordinated transaction management. */ diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index 62b5673ad..cf45bb9e0 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -124,7 +124,7 @@ EXCEPTION END $$; NOTICE: caught not_null_violation COMMIT; -ERROR: could not commit transaction for shard 1220100 on any active node +ERROR: could not make changes to shard 1220100 on any node \set VERBOSITY default -- should be valid to edit labs_mx after researchers_mx... BEGIN; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 339bcb764..43fe205b3 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -2041,6 +2041,81 @@ DEBUG: Plan is router executable (6 rows) SET client_min_messages to 'NOTICE'; +-- test that a connection failure marks placements invalid +SET citus.shard_replication_factor TO 2; +CREATE TABLE failure_test (a int, b int); +SELECT master_create_distributed_table('failure_test', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('failure_test', 2); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE USER router_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user; +\c - - - :worker_1_port +CREATE USER router_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user; +\c - router_user - :master_port +-- first test that it is marked invalid inside a transaction block +-- we will fail to connect to worker 2, since the user does not exist +BEGIN; +INSERT INTO failure_test VALUES (1, 1); +WARNING: connection error: localhost:57638 +DETAIL: no connection to the server + +WARNING: connection error: localhost:57638 +DETAIL: no connection to the server + +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement + WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard + WHERE logicalrelid = 'failure_test'::regclass + ) + ORDER BY placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 840008 | 1 | localhost | 57637 + 840008 | 3 | localhost | 57638 + 840009 | 1 | localhost | 57638 + 840009 | 1 | localhost | 57637 +(4 rows) + +ROLLBACK; +INSERT INTO failure_test VALUES (2, 1); +WARNING: connection error: localhost:57638 +DETAIL: no connection to the server + +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement + WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard + WHERE logicalrelid = 'failure_test'::regclass + ) + ORDER BY placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 840008 | 1 | localhost | 57637 + 840008 | 1 | localhost | 57638 + 840009 | 3 | localhost | 57638 + 840009 | 1 | localhost | 57637 +(4 rows) + +\c - postgres - :worker_1_port +DROP OWNED BY router_user; +DROP USER router_user; +\c - - - :master_port +DROP OWNED BY router_user; +DROP USER router_user; +DROP TABLE failure_test; DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_id_word_count(); DROP MATERIALIZED VIEW mv_articles_hash; diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 1b395488a..892b05b0b 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -920,6 +920,44 @@ SELECT id SET client_min_messages to 'NOTICE'; +-- test that a connection failure marks placements invalid +SET citus.shard_replication_factor TO 2; +CREATE TABLE failure_test (a int, b int); +SELECT master_create_distributed_table('failure_test', 'a', 'hash'); +SELECT master_create_worker_shards('failure_test', 2); + +CREATE USER router_user; +GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user; +\c - - - :worker_1_port +CREATE USER router_user; +GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user; +\c - router_user - :master_port +-- first test that it is marked invalid inside a transaction block +-- we will fail to connect to worker 2, since the user does not exist +BEGIN; +INSERT INTO failure_test VALUES (1, 1); +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement + WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard + WHERE logicalrelid = 'failure_test'::regclass + ) + ORDER BY placementid; +ROLLBACK; +INSERT INTO failure_test VALUES (2, 1); +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement + WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard + WHERE logicalrelid = 'failure_test'::regclass + ) + ORDER BY placementid; +\c - postgres - :worker_1_port +DROP OWNED BY router_user; +DROP USER router_user; +\c - - - :master_port +DROP OWNED BY router_user; +DROP USER router_user; +DROP TABLE failure_test; + DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_id_word_count();