From b905c8043d00fdb6d5bf853930db366612a56b8d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 17 Sep 2020 22:46:11 +0200 Subject: [PATCH] Fix create index concurrently crash with local execution --- src/backend/distributed/commands/index.c | 63 ++++++++++++++----- .../distributed/commands/utility_hook.c | 27 ++++++++ src/include/distributed/commands.h | 1 + .../distributed/commands/utility_hook.h | 9 +++ ...licate_reference_tables_to_coordinator.out | 31 +++++---- ...licate_reference_tables_to_coordinator.sql | 14 ++++- 6 files changed, 113 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index b6a75dd45..708e143f1 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -178,6 +178,7 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand) DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; + ddlJob->startNewTransaction = createIndexStatement->concurrent; ddlJob->commandString = createIndexCommand; ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement); @@ -284,6 +285,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand) ddlJob->targetRelationId = relationId; #if PG_VERSION_NUM >= PG_VERSION_12 ddlJob->concurrentIndexCmd = reindexStatement->concurrent; + ddlJob->startNewTransaction = reindexStatement->concurrent; #else ddlJob->concurrentIndexCmd = false; #endif @@ -377,6 +379,13 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand) ddlJob->targetRelationId = distributedRelationId; ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent; + + /* + * We do not want DROP INDEX CONCURRENTLY to commit locally before + * sending commands, because if there is a failure we would like to + * to be able to repeat the DROP INDEX later. + */ + ddlJob->startNewTransaction = false; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, dropIndexStatement); @@ -442,23 +451,6 @@ PostprocessIndexStmt(Node *node, const char *queryString) CommitTransactionCommand(); StartTransactionCommand(); - /* now, update index's validity in a way that can roll back */ - Relation pg_index = table_open(IndexRelationId, RowExclusiveLock); - - HeapTuple indexTuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum( - indexRelationId)); - Assert(HeapTupleIsValid(indexTuple)); /* better be present, we have lock! */ - - /* mark as valid, save, and update pg_index indexes */ - Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(indexTuple); - indexForm->indisvalid = true; - - CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple); - - /* clean up; index now marked valid, but ROLLBACK will mark invalid */ - heap_freetuple(indexTuple); - table_close(pg_index, RowExclusiveLock); - return NIL; } @@ -921,3 +913,40 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) return taskList; } + + +/* + * MarkIndexValid marks an index as valid after a CONCURRENTLY command. We mark + * indexes invalid in PostProcessIndexStmt and then commit, such that any failure + * leaves behind an invalid index. We mark it as valid here when the command + * completes. + */ +void +MarkIndexValid(IndexStmt *indexStmt) +{ + Assert(indexStmt->concurrent); + Assert(IsCoordinator()); + + /* + * We make sure schema name is not null in the PreprocessIndexStmt + */ + bool missingOk = false; + Oid schemaId = get_namespace_oid(indexStmt->relation->schemaname, missingOk); + + Oid relationId PG_USED_FOR_ASSERTS_ONLY = + get_relname_relid(indexStmt->relation->relname, schemaId); + + Assert(IsCitusTable(relationId)); + + /* get the affected relation and index */ + Relation relation = table_openrv(indexStmt->relation, ShareUpdateExclusiveLock); + Oid indexRelationId = get_relname_relid(indexStmt->idxname, + schemaId); + Relation indexRelation = index_open(indexRelationId, RowExclusiveLock); + + /* mark index as valid, in-place (cannot be rolled back) */ + index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID); + + table_close(relation, NoLock); + index_close(indexRelation, NoLock); +} diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 10c82d037..17253854d 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -579,6 +579,21 @@ multi_ProcessUtility(PlannedStmt *pstmt, { ExecuteDistributedDDLJob(ddlJob); } + + /* + * For CREATE/DROP/REINDEX CONCURRENTLY we mark the index as valid + * after successfully completing the distributed DDL job. + */ + if (IsA(parsetree, IndexStmt)) + { + IndexStmt *indexStmt = (IndexStmt *) parsetree; + + if (indexStmt->concurrent) + { + /* no failures during CONCURRENTLY, mark the index as valid */ + MarkIndexValid(indexStmt); + } + } } /* TODO: fold VACUUM's processing into the above block */ @@ -675,6 +690,18 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } else { + localExecutionSupported = false; + + /* + * Start a new transaction to make sure CONCURRENTLY commands + * on localhost do not block waiting for this transaction to finish. + */ + if (ddlJob->startNewTransaction) + { + CommitTransactionCommand(); + StartTransactionCommand(); + } + /* save old commit protocol to restore at xact end */ Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); SavedMultiShardCommitProtocol = MultiShardCommitProtocol; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 3168ea99e..9ff5d983b 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -177,6 +177,7 @@ extern List * PreprocessDropIndexStmt(Node *dropIndexStatement, extern List * PostprocessIndexStmt(Node *node, const char *queryString); extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement); +extern void MarkIndexValid(IndexStmt *indexStmt); /* objectaddress.c - forward declarations */ extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 3b6893e27..c7814f658 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -45,6 +45,15 @@ typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */ + + /* + * Whether to commit and start a new transaction before sending commands + * (only applies to CONCURRENTLY commands). This is needed for REINDEX CONCURRENTLY + * and CREATE INDEX CONCURRENTLY on local shards, which would otherwise + * get blocked waiting for the current transaction to finish. + */ + bool startNewTransaction; + const char *commandString; /* initial (coordinator) DDL command string */ List *taskList; /* worker DDL tasks to execute */ } DDLJob; diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index 2f4b1a640..d3a50e814 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -18,6 +18,13 @@ SELECT create_reference_table('squares'); INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; NOTICE: executing the copy locally for shard xxxxx +CREATE INDEX CONCURRENTLY squares_a_idx ON squares (a); +SELECT substring(current_Setting('server_version'), '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +REINDEX INDEX CONCURRENTLY squares_a_idx; +\endif +DROP INDEX CONCURRENTLY squares_a_idx; -- should be executed locally SELECT count(*) FROM squares; NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares @@ -43,16 +50,16 @@ BEGIN END; $$ language plpgsql VOLATILE; -- INSERT ... SELECT between reference tables BEGIN; -EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; - QUERY PLAN +EXPLAIN (COSTS OFF) INSERT INTO squares SELECT a, a*a FROM numbers; + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression - -> Insert on squares_8000000 citus_table_alias (cost=0.00..41.88 rows=2550 width=8) - -> Seq Scan on numbers_8000001 numbers (cost=0.00..41.88 rows=2550 width=8) + -> Insert on squares_8000000 citus_table_alias + -> Seq Scan on numbers_8000001 numbers (7 rows) INSERT INTO squares SELECT a, a*a FROM numbers; @@ -65,16 +72,16 @@ SELECT * FROM squares WHERE a >= 20 ORDER BY a; ROLLBACK; BEGIN; -EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3; - QUERY PLAN +EXPLAIN (COSTS OFF) INSERT INTO numbers SELECT a FROM squares WHERE a < 3; + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression - -> Insert on numbers_8000001 citus_table_alias (cost=0.00..38.25 rows=753 width=4) - -> Seq Scan on squares_8000000 squares (cost=0.00..38.25 rows=753 width=4) + -> Insert on numbers_8000001 citus_table_alias + -> Seq Scan on squares_8000000 squares Filter: (a < 3) (8 rows) @@ -392,8 +399,8 @@ CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AN NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10)) REFRESH MATERIALIZED VIEW numbers_v; NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10)) -SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; -NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a))) +SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a ORDER BY 1; +NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a))) ORDER BY squares.a a | b | a --------------------------------------------------------------------- 1 | 1 | 1 diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 91db19ee7..47f8df9a6 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -16,6 +16,14 @@ CREATE TABLE squares(a int, b int); SELECT create_reference_table('squares'); INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; +CREATE INDEX CONCURRENTLY squares_a_idx ON squares (a); +SELECT substring(current_Setting('server_version'), '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +REINDEX INDEX CONCURRENTLY squares_a_idx; +\endif +DROP INDEX CONCURRENTLY squares_a_idx; + -- should be executed locally SELECT count(*) FROM squares; @@ -33,13 +41,13 @@ END; $$ language plpgsql VOLATILE; -- INSERT ... SELECT between reference tables BEGIN; -EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; +EXPLAIN (COSTS OFF) INSERT INTO squares SELECT a, a*a FROM numbers; INSERT INTO squares SELECT a, a*a FROM numbers; SELECT * FROM squares WHERE a >= 20 ORDER BY a; ROLLBACK; BEGIN; -EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3; +EXPLAIN (COSTS OFF) INSERT INTO numbers SELECT a FROM squares WHERE a < 3; INSERT INTO numbers SELECT a FROM squares WHERE a < 3; SELECT * FROM numbers ORDER BY a; ROLLBACK; @@ -190,7 +198,7 @@ DROP VIEW numbers_v, local_table_v; CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; REFRESH MATERIALIZED VIEW numbers_v; -SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a ORDER BY 1; -- -- Joins between reference tables, local tables, and function calls