Merge pull request #1067 from citusdata/fix_mx_drop_sequence_deadlock

Prevent Deadlock on Dropping MX Tables with Sequences
pull/1070/head
Eren Başak 2016-12-28 15:52:53 +02:00 committed by GitHub
commit 2b90620f15
10 changed files with 237 additions and 39 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -115,6 +115,8 @@ $(EXTENSION)--6.1-9.sql: $(EXTENSION)--6.1-8.sql $(EXTENSION)--6.1-8--6.1-9.sql
cat $^ > $@
$(EXTENSION)--6.1-10.sql: $(EXTENSION)--6.1-9.sql $(EXTENSION)--6.1-9--6.1-10.sql
cat $^ > $@
$(EXTENSION)--6.1-11.sql: $(EXTENSION)--6.1-10.sql $(EXTENSION)--6.1-10--6.1-11.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,60 @@
/* citus--6.1-10--6.1-11.sql */
SET search_path = 'pg_catalog';
DROP FUNCTION master_drop_sequences(text[], text, bigint);
CREATE FUNCTION master_drop_sequences(sequence_names text[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_drop_sequences$$;
COMMENT ON FUNCTION master_drop_sequences(text[])
IS 'drop specified sequences from the cluster';
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
table_colocation_id integer;
propagate_drop boolean := false;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() JOIN
pg_dist_partition ON (logicalrelid = objid)
WHERE object_type IN ('table', 'foreign table')
LOOP
-- get colocation group
SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
PERFORM master_drop_distributed_table_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- drop colocation group if all referencing tables are dropped
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN
DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id;
END IF;
END LOOP;
IF cardinality(sequence_names) = 0 THEN
RETURN;
END IF;
PERFORM master_drop_sequences(sequence_names);
END;
$cdbdt$;
COMMENT ON FUNCTION citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.1-10'
default_version = '6.1-11'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -25,15 +25,18 @@
#include "catalog/namespace.h"
#include "commands/dbcommands.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/relay_utility.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
@ -216,25 +219,27 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
/*
* master_drop_sequences attempts to drop a list of sequences on a specified
* node. The "IF EXISTS" clause is used to permit dropping sequences even if
* they may not exist. Returns true on success, false on failure.
* master_drop_sequences attempts to drop a list of sequences on worker nodes.
* The "IF EXISTS" clause is used to permit dropping sequences even if they may not
* exist. If the commands fail on the workers, the operation is rolled back.
* If ddl propagation (citus.enable_ddl_propagation) is set to off, then the function
* returns without doing anything.
*/
Datum
master_drop_sequences(PG_FUNCTION_ARGS)
{
ArrayType *sequenceNamesArray = PG_GETARG_ARRAYTYPE_P(0);
text *nodeText = PG_GETARG_TEXT_P(1);
int64 nodePort = PG_GETARG_INT64(2);
bool dropSuccessful = false;
char *nodeName = TextDatumGetCString(nodeText);
ArrayIterator sequenceIterator = NULL;
Datum sequenceText = 0;
bool isNull = false;
StringInfo dropSeqCommand = makeStringInfo();
/* do nothing if DDL propagation is switched off */
if (!EnableDDLPropagation)
{
PG_RETURN_VOID();
}
/* iterate over sequence names to build single command to DROP them all */
sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL);
while (array_iterate(sequenceIterator, &sequenceText, &isNull))
@ -259,14 +264,10 @@ master_drop_sequences(PG_FUNCTION_ARGS)
appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText));
}
dropSuccessful = ExecuteRemoteCommand(nodeName, nodePort, dropSeqCommand);
if (!dropSuccessful)
{
ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT
"\"", nodeName, nodePort)));
}
SendCommandToWorkers(ALL_WORKERS, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(ALL_WORKERS, dropSeqCommand->data);
PG_RETURN_BOOL(dropSuccessful);
PG_RETURN_VOID();
}

View File

@ -344,3 +344,21 @@ WHERE
DROP TABLE temp;
\c - - - :worker_1_port
DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement;
DELETE FROM pg_dist_node;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
stop_metadata_sync_to_node
----------------------------
(1 row)

View File

@ -68,6 +68,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-7';
ALTER EXTENSION citus UPDATE TO '6.1-8';
ALTER EXTENSION citus UPDATE TO '6.1-9';
ALTER EXTENSION citus UPDATE TO '6.1-10';
ALTER EXTENSION citus UPDATE TO '6.1-11';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -861,18 +861,106 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te
(1 row)
DROP TABLE mx_temp_drop_test;
-- Cleanup
-- Create an MX table with sequences
\c - - - :master_port
SET citus.shard_count TO 3;
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL);
SELECT create_distributed_table('mx_table_with_sequence', 'a');
create_distributed_table
--------------------------
(1 row)
\d mx_table_with_sequence
Table "public.mx_table_with_sequence"
Column | Type | Modifiers
--------+---------+--------------------------------------------------------------------
a | integer |
b | bigint | not null default nextval('mx_table_with_sequence_b_seq'::regclass)
c | bigint | not null default nextval('mx_table_with_sequence_c_seq'::regclass)
\ds mx_table_with_sequence_b_seq
List of relations
Schema | Name | Type | Owner
--------+------------------------------+----------+----------
public | mx_table_with_sequence_b_seq | sequence | postgres
(1 row)
\ds mx_table_with_sequence_c_seq
List of relations
Schema | Name | Type | Owner
--------+------------------------------+----------+----------
public | mx_table_with_sequence_c_seq | sequence | postgres
(1 row)
-- Check that the sequences created on the worker as well
\c - - - :worker_1_port
\d mx_table_with_sequence
Table "public.mx_table_with_sequence"
Column | Type | Modifiers
--------+---------+--------------------------------------------------------------------
a | integer |
b | bigint | not null default nextval('mx_table_with_sequence_b_seq'::regclass)
c | bigint | not null default nextval('mx_table_with_sequence_c_seq'::regclass)
\ds mx_table_with_sequence_b_seq
List of relations
Schema | Name | Type | Owner
--------+------------------------------+----------+----------
public | mx_table_with_sequence_b_seq | sequence | postgres
(1 row)
\ds mx_table_with_sequence_c_seq
List of relations
Schema | Name | Type | Owner
--------+------------------------------+----------+----------
public | mx_table_with_sequence_c_seq | sequence | postgres
(1 row)
-- Check that dropping the mx table with sequences works as expected
\c - - - :master_port
DROP TABLE mx_table_with_sequence;
\d mx_table_with_sequence
\ds mx_table_with_sequence_b_seq
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
\ds mx_table_with_sequence_c_seq
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
-- Check that the sequences are dropped from the worker as well
\c - - - :worker_1_port
\d mx_table_with_sequence
\ds mx_table_with_sequence_b_seq
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
\ds mx_table_with_sequence_c_seq
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DELETE FROM pg_dist_node;
DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement;
\d mx_testing_schema.mx_test_table
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
@ -885,10 +973,6 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row)
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;

View File

@ -133,3 +133,12 @@ WHERE
AND pg_dist_shard_placement.nodeport = :worker_2_port;
DROP TABLE temp;
\c - - - :worker_1_port
DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement;
DELETE FROM pg_dist_node;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);

View File

@ -68,6 +68,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-7';
ALTER EXTENSION citus UPDATE TO '6.1-8';
ALTER EXTENSION citus UPDATE TO '6.1-9';
ALTER EXTENSION citus UPDATE TO '6.1-10';
ALTER EXTENSION citus UPDATE TO '6.1-11';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)

View File

@ -380,22 +380,44 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te
DROP TABLE mx_temp_drop_test;
-- Cleanup
-- Create an MX table with sequences
\c - - - :master_port
SET citus.shard_count TO 3;
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL);
SELECT create_distributed_table('mx_table_with_sequence', 'a');
\d mx_table_with_sequence
\ds mx_table_with_sequence_b_seq
\ds mx_table_with_sequence_c_seq
-- Check that the sequences created on the worker as well
\c - - - :worker_1_port
\d mx_table_with_sequence
\ds mx_table_with_sequence_b_seq
\ds mx_table_with_sequence_c_seq
-- Check that dropping the mx table with sequences works as expected
\c - - - :master_port
DROP TABLE mx_table_with_sequence;
\d mx_table_with_sequence
\ds mx_table_with_sequence_b_seq
\ds mx_table_with_sequence_c_seq
-- Check that the sequences are dropped from the worker as well
\c - - - :worker_1_port
\d mx_table_with_sequence
\ds mx_table_with_sequence_b_seq
\ds mx_table_with_sequence_c_seq
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DELETE FROM pg_dist_node;
DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement;
\d mx_testing_schema.mx_test_table
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
RESET citus.shard_count;
RESET citus.shard_replication_factor;