diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index bfba19741..cc56f4749 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -18,6 +18,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/distributed_planner.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" #include "utils/hsearch.h" @@ -1065,8 +1066,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) { uint64 shardId = shardEntry->key.shardId; uint64 placementId = placementEntry->key.placementId; - GroupShardPlacement *shardPlacement = - LoadGroupShardPlacement(shardId, placementId); + ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); /* * We only set shard state if its current state is FILE_FINALIZED, which @@ -1074,7 +1074,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry) */ if (shardPlacement->shardState == FILE_FINALIZED) { - UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); + MarkShardPlacementInactive(shardPlacement); } } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index d266bae67..dfd4069ea 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -137,6 +137,7 @@ #include "distributed/local_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" @@ -3495,7 +3496,7 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede */ if (shardPlacement->shardState == FILE_FINALIZED) { - UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); + MarkShardPlacementInactive(shardPlacement); } } diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 23156ffcb..9b3f409a0 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -28,6 +28,7 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "commands/extension.h" +#include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/citus_nodes.h" #include "distributed/listutils.h" @@ -36,6 +37,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" @@ -72,6 +74,7 @@ static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationI char *sizeQuery); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static void ErrorIfNotSuitableToGetSize(Oid relationId); +static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); /* exports for SQL callable functions */ @@ -1178,6 +1181,89 @@ DeleteShardPlacementRow(uint64 placementId) } +/* + * UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong + * to partitioned table. The function goes over the corresponding placements of its + * partitions, and sets their state to the input shardState. + */ +void +UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState) +{ + ShardInterval *parentShardInterval = + LoadShardInterval(parentShardPlacement->shardId); + Oid partitionedTableOid = parentShardInterval->relationId; + + /* this function should only be called for partitioned tables */ + Assert(PartitionedTable(partitionedTableOid)); + + ListCell *partitionOidCell = NULL; + List *partitionList = PartitionList(partitionedTableOid); + foreach(partitionOidCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionOidCell); + uint64 partitionShardId = + ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex); + + ShardPlacement *partitionPlacement = + ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId); + + /* the partition should have a placement with the same group */ + Assert(partitionPlacement != NULL); + + UpdateShardPlacementState(partitionPlacement->placementId, shardState); + } +} + + +/* + * ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement + * of the shard on the given group. If no such placement exists, the function + * return NULL. + */ +static ShardPlacement * +ShardPlacementOnGroup(uint64 shardId, int groupId) +{ + List *placementList = ShardPlacementList(shardId); + ListCell *placementCell = NULL; + + foreach(placementCell, placementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + if (placement->groupId == groupId) + { + return placement; + } + } + + return NULL; +} + + +/* + * MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where + * the state is set to invalid (e.g., FILE_INACTIVE). It also marks the partitions + * of the shard placements as inactive if the shardPlacement belongs to a partitioned + * table. + */ +void +MarkShardPlacementInactive(ShardPlacement *shardPlacement) +{ + UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); + + /* + * In case the shard belongs to a partitioned table, we make sure to update + * the states of its partitions. Repairing shards already ensures to recreate + * all the partitions. + */ + ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); + if (PartitionedTable(shardInterval->relationId)) + { + UpdatePartitionShardPlacementStates(shardPlacement, FILE_INACTIVE); + } +} + + /* * UpdateShardPlacementState sets the shardState for the placement identified * by placementId. diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 2cdbce21e..12e3811f9 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -127,6 +127,9 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, char replicationModel); extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); +extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, + char shardState); +extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void DeleteShardPlacementRow(uint64 placementId); extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, diff --git a/src/test/regress/expected/failure_replicated_partitions.out b/src/test/regress/expected/failure_replicated_partitions.out new file mode 100644 index 000000000..3d6d748a8 --- /dev/null +++ b/src/test/regress/expected/failure_replicated_partitions.out @@ -0,0 +1,70 @@ +SET citus.next_shard_id TO 1490000; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_replication_factor TO 2; +SET "citus.replication_model" to "statement"; +SET citus.shard_count TO 4; +CREATE TABLE partitioned_table ( + dist_key bigint, + partition_id integer +) PARTITION BY LIST (partition_id ); +SELECT create_distributed_table('partitioned_table', 'dist_key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE partitioned_table_0 + PARTITION OF partitioned_table (dist_key, partition_id) + FOR VALUES IN ( 0 ); +INSERT INTO partitioned_table VALUES (0, 0); +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO partitioned_table VALUES (0, 0); +WARNING: connection error: localhost:xxxxx +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +-- use both placements +SET citus.task_assignment_policy TO "round-robin"; +-- the results should be the same +SELECT count(*) FROM partitioned_table_0; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM partitioned_table_0; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM partitioned_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM partitioned_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE partitioned_table; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index c410c678e..f63d28358 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -4,6 +4,7 @@ test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it test: failure_setup test: multi_test_helpers +test: failure_replicated_partitions test: multi_test_catalog_views test: failure_ddl test: failure_truncate diff --git a/src/test/regress/sql/failure_replicated_partitions.sql b/src/test/regress/sql/failure_replicated_partitions.sql new file mode 100644 index 000000000..cb4b0ba6b --- /dev/null +++ b/src/test/regress/sql/failure_replicated_partitions.sql @@ -0,0 +1,42 @@ +SET citus.next_shard_id TO 1490000; + +SELECT citus.mitmproxy('conn.allow()'); + + +SET citus.shard_replication_factor TO 2; +SET "citus.replication_model" to "statement"; +SET citus.shard_count TO 4; + +CREATE TABLE partitioned_table ( + dist_key bigint, + partition_id integer +) PARTITION BY LIST (partition_id ); + +SELECT create_distributed_table('partitioned_table', 'dist_key'); +CREATE TABLE partitioned_table_0 + PARTITION OF partitioned_table (dist_key, partition_id) + FOR VALUES IN ( 0 ); + + +INSERT INTO partitioned_table VALUES (0, 0); + +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); + +INSERT INTO partitioned_table VALUES (0, 0); + +-- use both placements +SET citus.task_assignment_policy TO "round-robin"; + +-- the results should be the same +SELECT count(*) FROM partitioned_table_0; +SELECT count(*) FROM partitioned_table_0; + +SELECT count(*) FROM partitioned_table; +SELECT count(*) FROM partitioned_table; + + +-- ==== Clean up, we're done here ==== + +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE partitioned_table; +