mirror of https://github.com/citusdata/citus.git
Prevent deadlock for long named partitioned index creation on single node (#4461)
* Prevent deadlock for long named partitioned index creation on single node * Create IsSingleNodeCluster function * Use both local and sequential executionpull/4456/head^2
parent
f27649754b
commit
1f36ff7c17
|
@ -27,6 +27,7 @@
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
@ -37,6 +38,7 @@
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/relation_utils.h"
|
#include "distributed/relation_utils.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
#include "distributed/worker_manager.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
|
@ -54,7 +56,7 @@ static int GetNumberOfIndexParameters(IndexStmt *createIndexStatement);
|
||||||
static bool IndexAlreadyExists(IndexStmt *createIndexStatement);
|
static bool IndexAlreadyExists(IndexStmt *createIndexStatement);
|
||||||
static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement);
|
static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement);
|
||||||
static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement);
|
static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement);
|
||||||
static void SwitchToSequentialExecutionIfIndexNameTooLong(
|
static void SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(
|
||||||
IndexStmt *createIndexStatement);
|
IndexStmt *createIndexStatement);
|
||||||
static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement);
|
static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement);
|
||||||
static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement);
|
static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement);
|
||||||
|
@ -218,7 +220,7 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
||||||
* the same, and thus forming a self-deadlock as these tables/
|
* the same, and thus forming a self-deadlock as these tables/
|
||||||
* indexes are inserted into postgres' metadata tables, like pg_class.
|
* indexes are inserted into postgres' metadata tables, like pg_class.
|
||||||
*/
|
*/
|
||||||
SwitchToSequentialExecutionIfIndexNameTooLong(createIndexStatement);
|
SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement);
|
||||||
|
|
||||||
DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand);
|
DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand);
|
||||||
return list_make1(ddlJob);
|
return list_make1(ddlJob);
|
||||||
|
@ -344,12 +346,12 @@ ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcesso
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SwitchToSequentialExecutionIfIndexNameTooLong generates the longest index name
|
* SwitchToSequentialOrLocalExecutionIfIndexNameTooLong generates the longest index name
|
||||||
* on the shards of the partitions, and if exceeds the limit switches to the
|
* on the shards of the partitions, and if exceeds the limit switches to sequential and
|
||||||
* sequential execution to prevent self-deadlocks.
|
* local execution to prevent self-deadlocks.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
SwitchToSequentialExecutionIfIndexNameTooLong(IndexStmt *createIndexStatement)
|
SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(IndexStmt *createIndexStatement)
|
||||||
{
|
{
|
||||||
Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
||||||
if (!PartitionedTable(relationId))
|
if (!PartitionedTable(relationId))
|
||||||
|
@ -386,12 +388,15 @@ SwitchToSequentialExecutionIfIndexNameTooLong(IndexStmt *createIndexStatement)
|
||||||
"\"SET LOCAL citus.multi_shard_modify_mode TO "
|
"\"SET LOCAL citus.multi_shard_modify_mode TO "
|
||||||
"\'sequential\';\"")));
|
"\'sequential\';\"")));
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
elog(DEBUG1, "the index name on the shards of the partition "
|
elog(DEBUG1, "the index name on the shards of the partition "
|
||||||
"is too long, switching to sequential execution "
|
"is too long, switching to sequential and local execution "
|
||||||
"mode to prevent self deadlocks: %s", indexName);
|
"mode to prevent self deadlocks: %s", indexName);
|
||||||
|
|
||||||
SetLocalMultiShardModifyModeToSequential();
|
SetLocalMultiShardModifyModeToSequential();
|
||||||
|
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -540,6 +540,38 @@ SELECT COUNT(*) FROM pg_dist_placement p, pg_dist_shard s WHERE p.shardid = s.sh
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
-- test partitioned index creation with long name
|
||||||
|
CREATE TABLE test_index_creation1
|
||||||
|
(
|
||||||
|
tenant_id integer NOT NULL,
|
||||||
|
timeperiod timestamp without time zone NOT NULL,
|
||||||
|
field1 integer NOT NULL,
|
||||||
|
inserted_utc timestamp without time zone NOT NULL DEFAULT now(),
|
||||||
|
PRIMARY KEY(tenant_id, timeperiod)
|
||||||
|
) PARTITION BY RANGE (timeperiod);
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_26
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_27
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');
|
||||||
|
select create_distributed_table('test_index_creation1', 'tenant_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- should be able to create indexes with INCLUDE/WHERE
|
||||||
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
||||||
|
USING btree(tenant_id, timeperiod)
|
||||||
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
||||||
|
NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503038 ON coordinator_shouldhaveshards.test_index_creation1_1503038 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100)
|
||||||
|
NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503041 ON coordinator_shouldhaveshards.test_index_creation1_1503041 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100)
|
||||||
|
-- test if indexes are created
|
||||||
|
SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%');
|
||||||
|
created
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP TABLE ref_table;
|
DROP TABLE ref_table;
|
||||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE
|
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE
|
||||||
|
@ -550,7 +582,7 @@ DROP TABLE ref;
|
||||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
||||||
DROP TABLE test_append_table;
|
DROP TABLE test_append_table;
|
||||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||||
NOTICE: drop cascades to table local
|
NOTICE: drop cascades to 4 other objects
|
||||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -357,13 +357,13 @@ SET client_min_messages TO DEBUG1;
|
||||||
CREATE INDEX ix_test_index_creation2
|
CREATE INDEX ix_test_index_creation2
|
||||||
ON test_index_creation1 USING btree
|
ON test_index_creation1 USING btree
|
||||||
(tenant_id, timeperiod);
|
(tenant_id, timeperiod);
|
||||||
DEBUG: the index name on the shards of the partition is too long, switching to sequential execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
||||||
-- same test with schema qualified
|
-- same test with schema qualified
|
||||||
SET search_path TO public;
|
SET search_path TO public;
|
||||||
CREATE INDEX ix_test_index_creation3
|
CREATE INDEX ix_test_index_creation3
|
||||||
ON multi_index_statements.test_index_creation1 USING btree
|
ON multi_index_statements.test_index_creation1 USING btree
|
||||||
(tenant_id, timeperiod);
|
(tenant_id, timeperiod);
|
||||||
DEBUG: the index name on the shards of the partition is too long, switching to sequential execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
||||||
SET search_path TO multi_index_statements;
|
SET search_path TO multi_index_statements;
|
||||||
-- we cannot switch to sequential execution
|
-- we cannot switch to sequential execution
|
||||||
-- after a parallel query
|
-- after a parallel query
|
||||||
|
@ -392,16 +392,16 @@ BEGIN;
|
||||||
CREATE INDEX ix_test_index_creation4
|
CREATE INDEX ix_test_index_creation4
|
||||||
ON test_index_creation1 USING btree
|
ON test_index_creation1 USING btree
|
||||||
(tenant_id, timeperiod);
|
(tenant_id, timeperiod);
|
||||||
DEBUG: the index name on the shards of the partition is too long, switching to sequential execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- should be able to create indexes with INCLUDE/WHERE
|
-- should be able to create indexes with INCLUDE/WHERE
|
||||||
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
||||||
USING btree(tenant_id, timeperiod)
|
USING btree(tenant_id, timeperiod)
|
||||||
INCLUDE (field1) WHERE (tenant_id = 100);
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
||||||
DEBUG: the index name on the shards of the partition is too long, switching to sequential execution mode to prevent self deadlocks: test_index_creation1_p2020_09_2_tenant_id_timeperiod_field1_idx
|
DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_2_tenant_id_timeperiod_field1_idx
|
||||||
CREATE UNIQUE INDEX ix_test_index_creation6 ON test_index_creation1
|
CREATE UNIQUE INDEX ix_test_index_creation6 ON test_index_creation1
|
||||||
USING btree(tenant_id, timeperiod);
|
USING btree(tenant_id, timeperiod);
|
||||||
DEBUG: the index name on the shards of the partition is too long, switching to sequential execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26_10311_tenant_id_timeperiod_idx
|
||||||
-- should be able to create short named indexes in parallel
|
-- should be able to create short named indexes in parallel
|
||||||
-- as the table/index name is short
|
-- as the table/index name is short
|
||||||
CREATE INDEX f1
|
CREATE INDEX f1
|
||||||
|
|
|
@ -237,6 +237,36 @@ drop cascades to table "Quoed.Schema".simple_table_name_90630518
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630519
|
drop cascades to table "Quoed.Schema".simple_table_name_90630519
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630520
|
drop cascades to table "Quoed.Schema".simple_table_name_90630520
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630521
|
drop cascades to table "Quoed.Schema".simple_table_name_90630521
|
||||||
|
-- test partitioned index creation with long name
|
||||||
|
CREATE TABLE test_index_creation1
|
||||||
|
(
|
||||||
|
tenant_id integer NOT NULL,
|
||||||
|
timeperiod timestamp without time zone NOT NULL,
|
||||||
|
field1 integer NOT NULL,
|
||||||
|
inserted_utc timestamp without time zone NOT NULL DEFAULT now(),
|
||||||
|
PRIMARY KEY(tenant_id, timeperiod)
|
||||||
|
) PARTITION BY RANGE (timeperiod);
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_26
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_27
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');
|
||||||
|
select create_distributed_table('test_index_creation1', 'tenant_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- should be able to create indexes with INCLUDE/WHERE
|
||||||
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
||||||
|
USING btree(tenant_id, timeperiod)
|
||||||
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
||||||
|
-- test if indexes are created
|
||||||
|
SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%');
|
||||||
|
created
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- test citus size functions in transaction with modification
|
-- test citus size functions in transaction with modification
|
||||||
CREATE TABLE test_citus_size_func (a int);
|
CREATE TABLE test_citus_size_func (a int);
|
||||||
SELECT create_distributed_table('test_citus_size_func', 'a');
|
SELECT create_distributed_table('test_citus_size_func', 'a');
|
||||||
|
|
|
@ -64,6 +64,7 @@ END;
|
||||||
SET citus.shard_count TO 6;
|
SET citus.shard_count TO 6;
|
||||||
SET citus.log_remote_commands TO OFF;
|
SET citus.log_remote_commands TO OFF;
|
||||||
|
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.log_local_commands TO ON;
|
SET citus.log_local_commands TO ON;
|
||||||
CREATE TABLE dist_table (a int);
|
CREATE TABLE dist_table (a int);
|
||||||
|
@ -228,6 +229,31 @@ SELECT 1 FROM master_create_empty_shard('test_append_table');
|
||||||
SELECT COUNT(*) FROM pg_dist_placement p, pg_dist_shard s WHERE p.shardid = s.shardid AND s.logicalrelid = 'test_append_table'::regclass AND p.groupid > 0;
|
SELECT COUNT(*) FROM pg_dist_placement p, pg_dist_shard s WHERE p.shardid = s.shardid AND s.logicalrelid = 'test_append_table'::regclass AND p.groupid > 0;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
-- test partitioned index creation with long name
|
||||||
|
CREATE TABLE test_index_creation1
|
||||||
|
(
|
||||||
|
tenant_id integer NOT NULL,
|
||||||
|
timeperiod timestamp without time zone NOT NULL,
|
||||||
|
field1 integer NOT NULL,
|
||||||
|
inserted_utc timestamp without time zone NOT NULL DEFAULT now(),
|
||||||
|
PRIMARY KEY(tenant_id, timeperiod)
|
||||||
|
) PARTITION BY RANGE (timeperiod);
|
||||||
|
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_26
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_27
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');
|
||||||
|
|
||||||
|
select create_distributed_table('test_index_creation1', 'tenant_id');
|
||||||
|
|
||||||
|
-- should be able to create indexes with INCLUDE/WHERE
|
||||||
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
||||||
|
USING btree(tenant_id, timeperiod)
|
||||||
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
||||||
|
|
||||||
|
-- test if indexes are created
|
||||||
|
SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%');
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP TABLE ref_table;
|
DROP TABLE ref_table;
|
||||||
|
|
||||||
|
|
|
@ -121,6 +121,31 @@ ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooo
|
||||||
SET search_path TO single_node;
|
SET search_path TO single_node;
|
||||||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||||
|
|
||||||
|
-- test partitioned index creation with long name
|
||||||
|
CREATE TABLE test_index_creation1
|
||||||
|
(
|
||||||
|
tenant_id integer NOT NULL,
|
||||||
|
timeperiod timestamp without time zone NOT NULL,
|
||||||
|
field1 integer NOT NULL,
|
||||||
|
inserted_utc timestamp without time zone NOT NULL DEFAULT now(),
|
||||||
|
PRIMARY KEY(tenant_id, timeperiod)
|
||||||
|
) PARTITION BY RANGE (timeperiod);
|
||||||
|
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_26
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
|
||||||
|
CREATE TABLE test_index_creation1_p2020_09_27
|
||||||
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');
|
||||||
|
|
||||||
|
select create_distributed_table('test_index_creation1', 'tenant_id');
|
||||||
|
|
||||||
|
-- should be able to create indexes with INCLUDE/WHERE
|
||||||
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
||||||
|
USING btree(tenant_id, timeperiod)
|
||||||
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
||||||
|
|
||||||
|
-- test if indexes are created
|
||||||
|
SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%');
|
||||||
|
|
||||||
-- test citus size functions in transaction with modification
|
-- test citus size functions in transaction with modification
|
||||||
CREATE TABLE test_citus_size_func (a int);
|
CREATE TABLE test_citus_size_func (a int);
|
||||||
SELECT create_distributed_table('test_citus_size_func', 'a');
|
SELECT create_distributed_table('test_citus_size_func', 'a');
|
||||||
|
|
Loading…
Reference in New Issue