Merge pull request #3333 from citusdata/fix_wrong_data

Make sure to update shard states of partitions on failures
pull/3324/head
Önder Kalacı 2020-01-06 11:37:40 +00:00 committed by GitHub
commit 270571c106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 207 additions and 4 deletions

View File

@ -18,6 +18,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "utils/hsearch.h"
@ -1065,8 +1066,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
{
uint64 shardId = shardEntry->key.shardId;
uint64 placementId = placementEntry->key.placementId;
GroupShardPlacement *shardPlacement =
LoadGroupShardPlacement(shardId, placementId);
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
/*
* We only set shard state if its current state is FILE_FINALIZED, which
@ -1074,7 +1074,7 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
*/
if (shardPlacement->shardState == FILE_FINALIZED)
{
UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE);
MarkShardPlacementInactive(shardPlacement);
}
}
}

View File

@ -137,6 +137,7 @@
#include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h"
@ -3495,7 +3496,7 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
*/
if (shardPlacement->shardState == FILE_FINALIZED)
{
UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE);
MarkShardPlacementInactive(shardPlacement);
}
}

View File

@ -28,6 +28,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/citus_nodes.h"
#include "distributed/listutils.h"
@ -36,6 +37,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
@ -72,6 +74,7 @@ static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationI
char *sizeQuery);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static void ErrorIfNotSuitableToGetSize(Oid relationId);
static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
/* exports for SQL callable functions */
@ -1178,6 +1181,89 @@ DeleteShardPlacementRow(uint64 placementId)
}
/*
* UpdatePartitionShardPlacementStates gets a shard placement which is asserted to belong
* to partitioned table. The function goes over the corresponding placements of its
* partitions, and sets their state to the input shardState.
*/
void
UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char shardState)
{
ShardInterval *parentShardInterval =
LoadShardInterval(parentShardPlacement->shardId);
Oid partitionedTableOid = parentShardInterval->relationId;
/* this function should only be called for partitioned tables */
Assert(PartitionedTable(partitionedTableOid));
ListCell *partitionOidCell = NULL;
List *partitionList = PartitionList(partitionedTableOid);
foreach(partitionOidCell, partitionList)
{
Oid partitionOid = lfirst_oid(partitionOidCell);
uint64 partitionShardId =
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
ShardPlacement *partitionPlacement =
ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId);
/* the partition should have a placement with the same group */
Assert(partitionPlacement != NULL);
UpdateShardPlacementState(partitionPlacement->placementId, shardState);
}
}
/*
* ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement
* of the shard on the given group. If no such placement exists, the function
* return NULL.
*/
static ShardPlacement *
ShardPlacementOnGroup(uint64 shardId, int groupId)
{
List *placementList = ShardPlacementList(shardId);
ListCell *placementCell = NULL;
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
if (placement->groupId == groupId)
{
return placement;
}
}
return NULL;
}
/*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to invalid (e.g., FILE_INACTIVE). It also marks the partitions
* of the shard placements as inactive if the shardPlacement belongs to a partitioned
* table.
*/
void
MarkShardPlacementInactive(ShardPlacement *shardPlacement)
{
UpdateShardPlacementState(shardPlacement->placementId, FILE_INACTIVE);
/*
* In case the shard belongs to a partitioned table, we make sure to update
* the states of its partitions. Repairing shards already ensures to recreate
* all the partitions.
*/
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (PartitionedTable(shardInterval->relationId))
{
UpdatePartitionShardPlacementStates(shardPlacement, FILE_INACTIVE);
}
}
/*
* UpdateShardPlacementState sets the shardState for the placement identified
* by placementId.

View File

@ -127,6 +127,9 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
char replicationModel);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement,
char shardState);
extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,

View File

@ -0,0 +1,70 @@
SET citus.next_shard_id TO 1490000;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SET citus.shard_replication_factor TO 2;
SET "citus.replication_model" to "statement";
SET citus.shard_count TO 4;
CREATE TABLE partitioned_table (
dist_key bigint,
partition_id integer
) PARTITION BY LIST (partition_id );
SELECT create_distributed_table('partitioned_table', 'dist_key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE partitioned_table_0
PARTITION OF partitioned_table (dist_key, partition_id)
FOR VALUES IN ( 0 );
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
INSERT INTO partitioned_table VALUES (0, 0);
WARNING: connection error: localhost:xxxxx
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
-- use both placements
SET citus.task_assignment_policy TO "round-robin";
-- the results should be the same
SELECT count(*) FROM partitioned_table_0;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM partitioned_table_0;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM partitioned_table;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM partitioned_table;
count
---------------------------------------------------------------------
2
(1 row)
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
DROP TABLE partitioned_table;

View File

@ -4,6 +4,7 @@ test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it
test: failure_setup
test: multi_test_helpers
test: failure_replicated_partitions
test: multi_test_catalog_views
test: failure_ddl
test: failure_truncate

View File

@ -0,0 +1,42 @@
SET citus.next_shard_id TO 1490000;
SELECT citus.mitmproxy('conn.allow()');
SET citus.shard_replication_factor TO 2;
SET "citus.replication_model" to "statement";
SET citus.shard_count TO 4;
CREATE TABLE partitioned_table (
dist_key bigint,
partition_id integer
) PARTITION BY LIST (partition_id );
SELECT create_distributed_table('partitioned_table', 'dist_key');
CREATE TABLE partitioned_table_0
PARTITION OF partitioned_table (dist_key, partition_id)
FOR VALUES IN ( 0 );
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
INSERT INTO partitioned_table VALUES (0, 0);
-- use both placements
SET citus.task_assignment_policy TO "round-robin";
-- the results should be the same
SELECT count(*) FROM partitioned_table_0;
SELECT count(*) FROM partitioned_table_0;
SELECT count(*) FROM partitioned_table;
SELECT count(*) FROM partitioned_table;
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
DROP TABLE partitioned_table;