Make sure to update shard states of partitions on failures

Fixes #3331

In #2389, we've implemented support for partitioned tables with rep > 1.
The implementation is limiting the use of modification queries on the
partitions. In fact, we error out when any partition is modified via
EnsurePartitionTableNotReplicated().

However, we seem to forgot an important case, where the parent table's
partition is marked as INVALID. In that case, at least one of the partition
becomes INVALID. However, we do not mark partitions as INVALID ever.

If the user queries the partition table directly, Citus could happily send
the query to INVALID placements -- which are not marked as INVALID.

This PR fixes it by marking the placements of the partitions as INVALID
as well.

The shard placement repair logic already re-creates all the partitions,
so should be fine in that front.
pull/3333/head
Onder Kalaci 2019-12-21 21:26:50 +01:00
parent 3c770516eb
commit c8f14c9f6c
7 changed files with 207 additions and 4 deletions

View File

@ -18,6 +18,7 @@
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -1065,8 +1066,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
{ {
uint64 shardId = shardEntry->key.shardId; uint64 shardId = shardEntry->key.shardId;
uint64 placementId = placementEntry->key.placementId; uint64 placementId = placementEntry->key.placementId;
GroupShardPlacement *shardPlacement = ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
LoadGroupShardPlacement(shardId, placementId);
/* /*
* We only set shard state if its current state is FILE_FINALIZED, which * We only set shard state if its current state is FILE_FINALIZED, which
@ -1074,7 +1074,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
*/ */
if (shardPlacement->shardState == FILE_FINALIZED) if (shardPlacement->shardState == FILE_FINALIZED)
{ {
UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); MarkShardPlacementInactive(shardPlacement);
} }
} }
} }

View File

@ -137,6 +137,7 @@
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h" #include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
@ -3495,7 +3496,7 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
*/ */
if (shardPlacement->shardState == FILE_FINALIZED) if (shardPlacement->shardState == FILE_FINALIZED)
{ {
UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE); MarkShardPlacementInactive(shardPlacement);
} }
} }

View File

@ -28,6 +28,7 @@
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
@ -36,6 +37,7 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
@ -72,6 +74,7 @@ static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationI
char *sizeQuery); char *sizeQuery);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static void ErrorIfNotSuitableToGetSize(Oid relationId); static void ErrorIfNotSuitableToGetSize(Oid relationId);
static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -1178,6 +1181,89 @@ DeleteShardPlacementRow(uint64 placementId)
} }
/*
* UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong
* to partitioned table. The function goes over the corresponding placements of its
* partitions, and sets their state to the input shardState.
*/
void
UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState)
{
ShardInterval *parentShardInterval =
LoadShardInterval(parentShardPlacement->shardId);
Oid partitionedTableOid = parentShardInterval->relationId;
/* this function should only be called for partitioned tables */
Assert(PartitionedTable(partitionedTableOid));
ListCell *partitionOidCell = NULL;
List *partitionList = PartitionList(partitionedTableOid);
foreach(partitionOidCell, partitionList)
{
Oid partitionOid = lfirst_oid(partitionOidCell);
uint64 partitionShardId =
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
ShardPlacement *partitionPlacement =
ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId);
/* the partition should have a placement with the same group */
Assert(partitionPlacement != NULL);
UpdateShardPlacementState(partitionPlacement->placementId, shardState);
}
}
/*
* ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement
* of the shard on the given group. If no such placement exists, the function
* return NULL.
*/
static ShardPlacement *
ShardPlacementOnGroup(uint64 shardId, int groupId)
{
List *placementList = ShardPlacementList(shardId);
ListCell *placementCell = NULL;
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
if (placement->groupId == groupId)
{
return placement;
}
}
return NULL;
}
/*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to invalid (e.g., FILE_INACTIVE). It also marks the partitions
* of the shard placements as inactive if the shardPlacement belongs to a partitioned
* table.
*/
void
MarkShardPlacementInactive(ShardPlacement *shardPlacement)
{
UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE);
/*
* In case the shard belongs to a partitioned table, we make sure to update
* the states of its partitions. Repairing shards already ensures to recreate
* all the partitions.
*/
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (PartitionedTable(shardInterval->relationId))
{
UpdatePartitionShardPlacementStates(shardPlacement, FILE_INACTIVE);
}
}
/* /*
* UpdateShardPlacementState sets the shardState for the placement identified * UpdateShardPlacementState sets the shardState for the placement identified
* by placementId. * by placementId.

View File

@ -127,6 +127,9 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
char replicationModel); char replicationModel);
extern void DeletePartitionRow(Oid distributedRelationId); extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement,
char shardState);
extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement);
extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void DeleteShardPlacementRow(uint64 placementId); extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,

View File

@ -0,0 +1,70 @@
SET citus.next_shard_id TO 1490000;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SET citus.shard_replication_factor TO 2;
SET "citus.replication_model" to "statement";
SET citus.shard_count TO 4;
CREATE TABLE partitioned_table (
dist_key bigint,
partition_id integer
) PARTITION BY LIST (partition_id );
SELECT create_distributed_table('partitioned_table', 'dist_key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE partitioned_table_0
PARTITION OF partitioned_table (dist_key, partition_id)
FOR VALUES IN ( 0 );
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
INSERT INTO partitioned_table VALUES (0, 0);
WARNING: connection error: localhost:xxxxx
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
-- use both placements
SET citus.task_assignment_policy TO "round-robin";
-- the results should be the same
SELECT count(*) FROM partitioned_table_0;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM partitioned_table_0;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM partitioned_table;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM partitioned_table;
count
---------------------------------------------------------------------
2
(1 row)
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
DROP TABLE partitioned_table;

View File

@ -4,6 +4,7 @@ test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it # this should only be run by pg_regress_multi, you don't need it
test: failure_setup test: failure_setup
test: multi_test_helpers test: multi_test_helpers
test: failure_replicated_partitions
test: multi_test_catalog_views test: multi_test_catalog_views
test: failure_ddl test: failure_ddl
test: failure_truncate test: failure_truncate

View File

@ -0,0 +1,42 @@
SET citus.next_shard_id TO 1490000;
SELECT citus.mitmproxy('conn.allow()');
SET citus.shard_replication_factor TO 2;
SET "citus.replication_model" to "statement";
SET citus.shard_count TO 4;
CREATE TABLE partitioned_table (
dist_key bigint,
partition_id integer
) PARTITION BY LIST (partition_id );
SELECT create_distributed_table('partitioned_table', 'dist_key');
CREATE TABLE partitioned_table_0
PARTITION OF partitioned_table (dist_key, partition_id)
FOR VALUES IN ( 0 );
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
INSERT INTO partitioned_table VALUES (0, 0);
-- use both placements
SET citus.task_assignment_policy TO "round-robin";
-- the results should be the same
SELECT count(*) FROM partitioned_table_0;
SELECT count(*) FROM partitioned_table_0;
SELECT count(*) FROM partitioned_table;
SELECT count(*) FROM partitioned_table;
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
DROP TABLE partitioned_table;