Merge pull request #4173 from citusdata/fix/create-index-concurrently-local

pull/4198/head
Marco Slot 2020-09-29 10:15:40 +02:00 committed by GitHub
commit 12ecdea790
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 32 deletions

View File

@ -178,6 +178,7 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ddlJob->targetRelationId = relationId;
ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; ddlJob->concurrentIndexCmd = createIndexStatement->concurrent;
ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->commandString = createIndexCommand; ddlJob->commandString = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement); ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement);
@ -284,6 +285,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand)
ddlJob->targetRelationId = relationId; ddlJob->targetRelationId = relationId;
#if PG_VERSION_NUM >= PG_VERSION_12 #if PG_VERSION_NUM >= PG_VERSION_12
ddlJob->concurrentIndexCmd = reindexStatement->concurrent; ddlJob->concurrentIndexCmd = reindexStatement->concurrent;
ddlJob->startNewTransaction = reindexStatement->concurrent;
#else #else
ddlJob->concurrentIndexCmd = false; ddlJob->concurrentIndexCmd = false;
#endif #endif
@ -377,6 +379,13 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand)
ddlJob->targetRelationId = distributedRelationId; ddlJob->targetRelationId = distributedRelationId;
ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent; ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent;
/*
* We do not want DROP INDEX CONCURRENTLY to commit locally before
* sending commands, because if there is a failure we would like to
* to be able to repeat the DROP INDEX later.
*/
ddlJob->startNewTransaction = false;
ddlJob->commandString = dropIndexCommand; ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement); dropIndexStatement);
@ -442,23 +451,6 @@ PostprocessIndexStmt(Node *node, const char *queryString)
CommitTransactionCommand(); CommitTransactionCommand();
StartTransactionCommand(); StartTransactionCommand();
/* now, update index's validity in a way that can roll back */
Relation pg_index = table_open(IndexRelationId, RowExclusiveLock);
HeapTuple indexTuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum(
indexRelationId));
Assert(HeapTupleIsValid(indexTuple)); /* better be present, we have lock! */
/* mark as valid, save, and update pg_index indexes */
Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
indexForm->indisvalid = true;
CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
/* clean up; index now marked valid, but ROLLBACK will mark invalid */
heap_freetuple(indexTuple);
table_close(pg_index, RowExclusiveLock);
return NIL; return NIL;
} }
@ -921,3 +913,40 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
return taskList; return taskList;
} }
/*
* MarkIndexValid marks an index as valid after a CONCURRENTLY command. We mark
* indexes invalid in PostProcessIndexStmt and then commit, such that any failure
* leaves behind an invalid index. We mark it as valid here when the command
* completes.
*/
void
MarkIndexValid(IndexStmt *indexStmt)
{
Assert(indexStmt->concurrent);
Assert(IsCoordinator());
/*
* We make sure schema name is not null in the PreprocessIndexStmt
*/
bool missingOk = false;
Oid schemaId = get_namespace_oid(indexStmt->relation->schemaname, missingOk);
Oid relationId PG_USED_FOR_ASSERTS_ONLY =
get_relname_relid(indexStmt->relation->relname, schemaId);
Assert(IsCitusTable(relationId));
/* get the affected relation and index */
Relation relation = table_openrv(indexStmt->relation, ShareUpdateExclusiveLock);
Oid indexRelationId = get_relname_relid(indexStmt->idxname,
schemaId);
Relation indexRelation = index_open(indexRelationId, RowExclusiveLock);
/* mark index as valid, in-place (cannot be rolled back) */
index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
table_close(relation, NoLock);
index_close(indexRelation, NoLock);
}

View File

@ -579,6 +579,21 @@ multi_ProcessUtility(PlannedStmt *pstmt,
{ {
ExecuteDistributedDDLJob(ddlJob); ExecuteDistributedDDLJob(ddlJob);
} }
/*
* For CREATE/DROP/REINDEX CONCURRENTLY we mark the index as valid
* after successfully completing the distributed DDL job.
*/
if (IsA(parsetree, IndexStmt))
{
IndexStmt *indexStmt = (IndexStmt *) parsetree;
if (indexStmt->concurrent)
{
/* no failures during CONCURRENTLY, mark the index as valid */
MarkIndexValid(indexStmt);
}
}
} }
/* TODO: fold VACUUM's processing into the above block */ /* TODO: fold VACUUM's processing into the above block */
@ -675,6 +690,18 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
} }
else else
{ {
localExecutionSupported = false;
/*
* Start a new transaction to make sure CONCURRENTLY commands
* on localhost do not block waiting for this transaction to finish.
*/
if (ddlJob->startNewTransaction)
{
CommitTransactionCommand();
StartTransactionCommand();
}
/* save old commit protocol to restore at xact end */ /* save old commit protocol to restore at xact end */
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol; SavedMultiShardCommitProtocol = MultiShardCommitProtocol;

View File

@ -177,6 +177,7 @@ extern List * PreprocessDropIndexStmt(Node *dropIndexStatement,
extern List * PostprocessIndexStmt(Node *node, extern List * PostprocessIndexStmt(Node *node,
const char *queryString); const char *queryString);
extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement); extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement);
extern void MarkIndexValid(IndexStmt *indexStmt);
/* objectaddress.c - forward declarations */ /* objectaddress.c - forward declarations */
extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok);

View File

@ -45,6 +45,15 @@ typedef struct DDLJob
{ {
Oid targetRelationId; /* oid of the target distributed relation */ Oid targetRelationId; /* oid of the target distributed relation */
bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */ bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */
/*
* Whether to commit and start a new transaction before sending commands
* (only applies to CONCURRENTLY commands). This is needed for REINDEX CONCURRENTLY
* and CREATE INDEX CONCURRENTLY on local shards, which would otherwise
* get blocked waiting for the current transaction to finish.
*/
bool startNewTransaction;
const char *commandString; /* initial (coordinator) DDL command string */ const char *commandString; /* initial (coordinator) DDL command string */
List *taskList; /* worker DDL tasks to execute */ List *taskList; /* worker DDL tasks to execute */
} DDLJob; } DDLJob;

View File

@ -18,6 +18,13 @@ SELECT create_reference_table('squares');
INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx
CREATE INDEX CONCURRENTLY squares_a_idx ON squares (a);
SELECT substring(current_Setting('server_version'), '\d+')::int > 11 AS server_version_above_eleven
\gset
\if :server_version_above_eleven
REINDEX INDEX CONCURRENTLY squares_a_idx;
\endif
DROP INDEX CONCURRENTLY squares_a_idx;
-- should be executed locally -- should be executed locally
SELECT count(*) FROM squares; SELECT count(*) FROM squares;
NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares
@ -43,16 +50,16 @@ BEGIN
END; $$ language plpgsql VOLATILE; END; $$ language plpgsql VOLATILE;
-- INSERT ... SELECT between reference tables -- INSERT ... SELECT between reference tables
BEGIN; BEGIN;
EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; EXPLAIN (COSTS OFF) INSERT INTO squares SELECT a, a*a FROM numbers;
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Custom Scan (Citus Adaptive)
Task Count: 1 Task Count: 1
Tasks Shown: All Tasks Shown: All
-> Task -> Task
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Insert on squares_8000000 citus_table_alias (cost=0.00..41.88 rows=2550 width=8) -> Insert on squares_8000000 citus_table_alias
-> Seq Scan on numbers_8000001 numbers (cost=0.00..41.88 rows=2550 width=8) -> Seq Scan on numbers_8000001 numbers
(7 rows) (7 rows)
INSERT INTO squares SELECT a, a*a FROM numbers; INSERT INTO squares SELECT a, a*a FROM numbers;
@ -65,16 +72,16 @@ SELECT * FROM squares WHERE a >= 20 ORDER BY a;
ROLLBACK; ROLLBACK;
BEGIN; BEGIN;
EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3; EXPLAIN (COSTS OFF) INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Custom Scan (Citus Adaptive)
Task Count: 1 Task Count: 1
Tasks Shown: All Tasks Shown: All
-> Task -> Task
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Insert on numbers_8000001 citus_table_alias (cost=0.00..38.25 rows=753 width=4) -> Insert on numbers_8000001 citus_table_alias
-> Seq Scan on squares_8000000 squares (cost=0.00..38.25 rows=753 width=4) -> Seq Scan on squares_8000000 squares
Filter: (a < 3) Filter: (a < 3)
(8 rows) (8 rows)
@ -392,8 +399,8 @@ CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AN
NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10)) NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10))
REFRESH MATERIALIZED VIEW numbers_v; REFRESH MATERIALIZED VIEW numbers_v;
NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10)) NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10))
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a ORDER BY 1;
NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a))) NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a))) ORDER BY squares.a
a | b | a a | b | a
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | 1 1 | 1 | 1

View File

@ -16,6 +16,14 @@ CREATE TABLE squares(a int, b int);
SELECT create_reference_table('squares'); SELECT create_reference_table('squares');
INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
CREATE INDEX CONCURRENTLY squares_a_idx ON squares (a);
SELECT substring(current_Setting('server_version'), '\d+')::int > 11 AS server_version_above_eleven
\gset
\if :server_version_above_eleven
REINDEX INDEX CONCURRENTLY squares_a_idx;
\endif
DROP INDEX CONCURRENTLY squares_a_idx;
-- should be executed locally -- should be executed locally
SELECT count(*) FROM squares; SELECT count(*) FROM squares;
@ -33,13 +41,13 @@ END; $$ language plpgsql VOLATILE;
-- INSERT ... SELECT between reference tables -- INSERT ... SELECT between reference tables
BEGIN; BEGIN;
EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; EXPLAIN (COSTS OFF) INSERT INTO squares SELECT a, a*a FROM numbers;
INSERT INTO squares SELECT a, a*a FROM numbers; INSERT INTO squares SELECT a, a*a FROM numbers;
SELECT * FROM squares WHERE a >= 20 ORDER BY a; SELECT * FROM squares WHERE a >= 20 ORDER BY a;
ROLLBACK; ROLLBACK;
BEGIN; BEGIN;
EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3; EXPLAIN (COSTS OFF) INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
INSERT INTO numbers SELECT a FROM squares WHERE a < 3; INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
SELECT * FROM numbers ORDER BY a; SELECT * FROM numbers ORDER BY a;
ROLLBACK; ROLLBACK;
@ -190,7 +198,7 @@ DROP VIEW numbers_v, local_table_v;
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
REFRESH MATERIALIZED VIEW numbers_v; REFRESH MATERIALIZED VIEW numbers_v;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a ORDER BY 1;
-- --
-- Joins between reference tables, local tables, and function calls -- Joins between reference tables, local tables, and function calls