From c8f14c9f6c6ad3bd9bb3c20dc481e419ebf80867 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Sat, 21 Dec 2019 21:26:50 +0100 Subject: [PATCH] Make sure to update shard states of partitions on failures Fixes #3331 In #2389, we've implemented support for partitioned tables with rep > 1. The implementation is limiting the use of modification queries on the partitions. In fact, we error out when any partition is modified via EnsurePartitionTableNotReplicated(). However, we seem to forgot an important case, where the parent table's partition is marked as INVALID. In that case, at least one of the partition becomes INVALID. However, we do not mark partitions as INVALID ever. If the user queries the partition table directly, Citus could happily send the query to INVALID placements -- which are not marked as INVALID. This PR fixes it by marking the placements of the partitions as INVALID as well. The shard placement repair logic already re-creates all the partitions, so should be fine in that front. --- .../connection/placement_connection.c | 6 +- .../distributed/executor/adaptive_executor.c | 3 +- .../master/master_metadata_utility.c | 86 +++++++++++++++++++ .../distributed/master_metadata_utility.h | 3 + .../failure_replicated_partitions.out | 70 +++++++++++++++ src/test/regress/failure_schedule | 1 + .../sql/failure_replicated_partitions.sql | 42 +++++++++ 7 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 src/test/regress/expected/failure_replicated_partitions.out create mode 100644 src/test/regress/sql/failure_replicated_partitions.sql diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index bfba19741..cc56f4749 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -18,6 +18,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/distributed_planner.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" #include "utils/hsearch.h" @@ -1065,8 +1066,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) { uint64 shardId = shardEntry->key.shardId; uint64 placementId = placementEntry->key.placementId; - GroupShardPlacement *shardPlacement = - LoadGroupShardPlacement(shardId, placementId); + ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); /* * We only set shard state if its current state is FILE_FINALIZED, which @@ -1074,7 +1074,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) */ if (shardPlacement->shardState == FILE_FINALIZED) { - UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); + MarkShardPlacementInactive(shardPlacement); } } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index d266bae67..dfd4069ea 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -137,6 +137,7 @@ #include "distributed/local_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" @@ -3495,7 +3496,7 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede */ if (shardPlacement->shardState == FILE_FINALIZED) { - UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); + MarkShardPlacementInactive(shardPlacement); } } diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 23156ffcb..9b3f409a0 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -28,6 +28,7 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "commands/extension.h" +#include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/citus_nodes.h" #include "distributed/listutils.h" @@ -36,6 +37,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" @@ -72,6 +74,7 @@ static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationI char *sizeQuery); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static void ErrorIfNotSuitableToGetSize(Oid relationId); +static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); /* exports for SQL callable functions */ @@ -1178,6 +1181,89 @@ DeleteShardPlacementRow(uint64 placementId) } +/* + * UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong + * to partitioned table. The function goes over the corresponding placements of its + * partitions, and sets their state to the input shardState. + */ +void +UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState) +{ + ShardInterval *parentShardInterval = + LoadShardInterval(parentShardPlacement->shardId); + Oid partitionedTableOid = parentShardInterval->relationId; + + /* this function should only be called for partitioned tables */ + Assert(PartitionedTable(partitionedTableOid)); + + ListCell *partitionOidCell = NULL; + List *partitionList = PartitionList(partitionedTableOid); + foreach(partitionOidCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionOidCell); + uint64 partitionShardId = + ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex); + + ShardPlacement *partitionPlacement = + ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId); + + /* the partition should have a placement with the same group */ + Assert(partitionPlacement != NULL); + + UpdateShardPlacementState(partitionPlacement->placementId, shardState); + } +} + + +/* + * ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement + * of the shard on the given group. If no such placement exists, the function + * return NULL. + */ +static ShardPlacement * +ShardPlacementOnGroup(uint64 shardId, int groupId) +{ + List *placementList = ShardPlacementList(shardId); + ListCell *placementCell = NULL; + + foreach(placementCell, placementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + if (placement->groupId == groupId) + { + return placement; + } + } + + return NULL; +} + + +/* + * MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where + * the state is set to invalid (e.g., FILE_INACTIVE). It also marks the partitions + * of the shard placements as inactive if the shardPlacement belongs to a partitioned + * table. + */ +void +MarkShardPlacementInactive(ShardPlacement *shardPlacement) +{ + UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); + + /* + * In case the shard belongs to a partitioned table, we make sure to update + * the states of its partitions. Repairing shards already ensures to recreate + * all the partitions. + */ + ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); + if (PartitionedTable(shardInterval->relationId)) + { + UpdatePartitionShardPlacementStates(shardPlacement, FILE_INACTIVE); + } +} + + /* * UpdateShardPlacementState sets the shardState for the placement identified * by placementId. diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 2cdbce21e..12e3811f9 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -127,6 +127,9 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, char replicationModel); extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); +extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, + char shardState); +extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void DeleteShardPlacementRow(uint64 placementId); extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, diff --git a/src/test/regress/expected/failure_replicated_partitions.out b/src/test/regress/expected/failure_replicated_partitions.out new file mode 100644 index 000000000..3d6d748a8 --- /dev/null +++ b/src/test/regress/expected/failure_replicated_partitions.out @@ -0,0 +1,70 @@ +SET citus.next_shard_id TO 1490000; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_replication_factor TO 2; +SET "citus.replication_model" to "statement"; +SET citus.shard_count TO 4; +CREATE TABLE partitioned_table ( + dist_key bigint, + partition_id integer +) PARTITION BY LIST (partition_id ); +SELECT create_distributed_table('partitioned_table', 'dist_key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE partitioned_table_0 + PARTITION OF partitioned_table (dist_key, partition_id) + FOR VALUES IN ( 0 ); +INSERT INTO partitioned_table VALUES (0, 0); +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO partitioned_table VALUES (0, 0); +WARNING: connection error: localhost:xxxxx +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +-- use both placements +SET citus.task_assignment_policy TO "round-robin"; +-- the results should be the same +SELECT count(*) FROM partitioned_table_0; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM partitioned_table_0; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM partitioned_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM partitioned_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE partitioned_table; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index c410c678e..f63d28358 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -4,6 +4,7 @@ test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it test: failure_setup test: multi_test_helpers +test: failure_replicated_partitions test: multi_test_catalog_views test: failure_ddl test: failure_truncate diff --git a/src/test/regress/sql/failure_replicated_partitions.sql b/src/test/regress/sql/failure_replicated_partitions.sql new file mode 100644 index 000000000..cb4b0ba6b --- /dev/null +++ b/src/test/regress/sql/failure_replicated_partitions.sql @@ -0,0 +1,42 @@ +SET citus.next_shard_id TO 1490000; + +SELECT citus.mitmproxy('conn.allow()'); + + +SET citus.shard_replication_factor TO 2; +SET "citus.replication_model" to "statement"; +SET citus.shard_count TO 4; + +CREATE TABLE partitioned_table ( + dist_key bigint, + partition_id integer +) PARTITION BY LIST (partition_id ); + +SELECT create_distributed_table('partitioned_table', 'dist_key'); +CREATE TABLE partitioned_table_0 + PARTITION OF partitioned_table (dist_key, partition_id) + FOR VALUES IN ( 0 ); + + +INSERT INTO partitioned_table VALUES (0, 0); + +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); + +INSERT INTO partitioned_table VALUES (0, 0); + +-- use both placements +SET citus.task_assignment_policy TO "round-robin"; + +-- the results should be the same +SELECT count(*) FROM partitioned_table_0; +SELECT count(*) FROM partitioned_table_0; + +SELECT count(*) FROM partitioned_table; +SELECT count(*) FROM partitioned_table; + + +-- ==== Clean up, we're done here ==== + +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE partitioned_table; +