mirror of https://github.com/citusdata/citus.git
Add code to set index validity on failure
Coordinator code marks index as invalid as a base, set it as valid in a transactional layer atop that base, then proceeds with worker commands. If a worker command has problems, the rollback results in an index with isvalid = false. If everything succeeds, the user sees a valid index.pull/1287/head
parent
dea6c44f75
commit
32886e97a3
|
@ -24,6 +24,7 @@
|
||||||
#include "catalog/catalog.h"
|
#include "catalog/catalog.h"
|
||||||
#include "catalog/dependency.h"
|
#include "catalog/dependency.h"
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
|
#include "catalog/indexing.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "catalog/pg_attribute.h"
|
#include "catalog/pg_attribute.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
|
@ -76,6 +77,7 @@
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/relcache.h"
|
#include "utils/relcache.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -142,6 +144,7 @@ static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid ol
|
||||||
void *arg);
|
void *arg);
|
||||||
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||||
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
|
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
|
||||||
|
static void PostProcessUtility(Node *parsetree);
|
||||||
|
|
||||||
|
|
||||||
static bool warnedUserAbout2PC = false;
|
static bool warnedUserAbout2PC = false;
|
||||||
|
@ -369,6 +372,8 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
standard_ProcessUtility(parsetree, queryString, context,
|
standard_ProcessUtility(parsetree, queryString, context,
|
||||||
params, dest, completionTag);
|
params, dest, completionTag);
|
||||||
|
|
||||||
|
PostProcessUtility(parsetree);
|
||||||
|
|
||||||
if (commandMustRunAsOwner)
|
if (commandMustRunAsOwner)
|
||||||
{
|
{
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
@ -1981,7 +1986,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
ShowNoticeIfNotUsing2PC();
|
|
||||||
|
|
||||||
if (ddlJob->preventTransaction)
|
if (ddlJob->preventTransaction)
|
||||||
{
|
{
|
||||||
|
@ -1990,6 +1994,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
|
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
|
||||||
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ShowNoticeIfNotUsing2PC();
|
||||||
|
}
|
||||||
|
|
||||||
if (shouldSyncMetadata)
|
if (shouldSyncMetadata)
|
||||||
{
|
{
|
||||||
|
@ -2474,6 +2482,83 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PostProcessUtility performs additional tasks after a utility's local portion
|
||||||
|
* has been completed. Right now, the sole use is marking new indexes invalid
|
||||||
|
* if they were created using the CONCURRENTLY flag. This (non-transactional)
|
||||||
|
* change provides the fallback state if an error is raised, otherwise a sub-
|
||||||
|
* sequent change to valid will be committed.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
PostProcessUtility(Node *parsetree)
|
||||||
|
{
|
||||||
|
IndexStmt *indexStmt = NULL;
|
||||||
|
Relation relation = NULL;
|
||||||
|
Oid indexRelationId = InvalidOid;
|
||||||
|
Relation indexRelation = NULL;
|
||||||
|
Relation pg_index = NULL;
|
||||||
|
HeapTuple indexTuple = NULL;
|
||||||
|
Form_pg_index indexForm = NULL;
|
||||||
|
|
||||||
|
/* only IndexStmts are processed */
|
||||||
|
if (!IsA(parsetree, IndexStmt))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* and even then only if they're CONCURRENT */
|
||||||
|
indexStmt = (IndexStmt *) parsetree;
|
||||||
|
if (!indexStmt->concurrent)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* finally, this logic only applies to the coordinator */
|
||||||
|
if (!IsCoordinator())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* commit the current transaction and start anew */
|
||||||
|
CommitTransactionCommand();
|
||||||
|
StartTransactionCommand();
|
||||||
|
|
||||||
|
/* get the affected relation and index */
|
||||||
|
relation = heap_openrv(indexStmt->relation, ShareUpdateExclusiveLock);
|
||||||
|
indexRelationId = get_relname_relid(indexStmt->idxname,
|
||||||
|
RelationGetNamespace(relation));
|
||||||
|
indexRelation = index_open(indexRelationId, RowExclusiveLock);
|
||||||
|
|
||||||
|
/* close relations but retain locks */
|
||||||
|
heap_close(relation, NoLock);
|
||||||
|
index_close(indexRelation, NoLock);
|
||||||
|
|
||||||
|
/* mark index as invalid, in-place (cannot be rolled back) */
|
||||||
|
index_set_state_flags(indexRelationId, INDEX_DROP_CLEAR_VALID);
|
||||||
|
|
||||||
|
/* re-open a transaction command from here on out */
|
||||||
|
CommitTransactionCommand();
|
||||||
|
StartTransactionCommand();
|
||||||
|
|
||||||
|
/* now, update index's validity in a way that can roll back */
|
||||||
|
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
|
||||||
|
|
||||||
|
indexTuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum(indexRelationId));
|
||||||
|
Assert(HeapTupleIsValid(indexTuple)); /* better be present, we have lock! */
|
||||||
|
|
||||||
|
/* mark as valid, save, and update pg_index indexes */
|
||||||
|
indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
|
||||||
|
indexForm->indisvalid = true;
|
||||||
|
|
||||||
|
simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
|
||||||
|
CatalogUpdateIndexes(pg_index, indexTuple);
|
||||||
|
|
||||||
|
/* clean up; index now marked valid, but ROLLBACK will mark invalid */
|
||||||
|
heap_freetuple(indexTuple);
|
||||||
|
heap_close(pg_index, RowExclusiveLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PlanGrantStmt determines whether a given GRANT/REVOKE statement involves
|
* PlanGrantStmt determines whether a given GRANT/REVOKE statement involves
|
||||||
* a distributed table. If so, it creates DDLJobs to encapsulate information
|
* a distributed table. If so, it creates DDLJobs to encapsulate information
|
||||||
|
|
|
@ -245,7 +245,45 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||||
------------+-----------+-----------+------------+----------
|
------------+-----------+-----------+------------+----------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
-- create index that will conflict with master operations
|
||||||
|
CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b);
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
-- should fail because worker index already exists
|
||||||
|
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
|
||||||
|
ERROR: relation "ith_b_idx_102089" already exists
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
-- the failure results in an INVALID index
|
||||||
|
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
|
||||||
|
Index Valid?
|
||||||
|
--------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we can clean it up and recreate with an DROP IF EXISTS
|
||||||
|
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
|
||||||
|
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
|
||||||
|
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
|
||||||
|
Index Valid?
|
||||||
|
--------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
-- now drop shard index to test partial master DROP failure
|
||||||
|
DROP INDEX CONCURRENTLY ith_b_idx_102089;
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP INDEX CONCURRENTLY ith_b_idx;
|
||||||
|
ERROR: index "ith_b_idx_102089" does not exist
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
-- the failure results in an INVALID index
|
||||||
|
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
|
||||||
|
Index Valid?
|
||||||
|
--------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- final clean up
|
||||||
|
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
|
||||||
-- Drop created tables
|
-- Drop created tables
|
||||||
DROP TABLE index_test_range;
|
DROP TABLE index_test_range;
|
||||||
DROP TABLE index_test_hash;
|
DROP TABLE index_test_hash;
|
||||||
|
|
|
@ -139,8 +139,37 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||||
|
|
||||||
|
-- create index that will conflict with master operations
|
||||||
|
CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b);
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- should fail because worker index already exists
|
||||||
|
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
|
||||||
|
|
||||||
|
-- the failure results in an INVALID index
|
||||||
|
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
|
||||||
|
|
||||||
|
-- we can clean it up and recreate with an DROP IF EXISTS
|
||||||
|
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
|
||||||
|
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
|
||||||
|
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
|
-- now drop shard index to test partial master DROP failure
|
||||||
|
DROP INDEX CONCURRENTLY ith_b_idx_102089;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP INDEX CONCURRENTLY ith_b_idx;
|
||||||
|
|
||||||
|
-- the failure results in an INVALID index
|
||||||
|
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
|
||||||
|
|
||||||
|
-- final clean up
|
||||||
|
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
|
||||||
|
|
||||||
-- Drop created tables
|
-- Drop created tables
|
||||||
DROP TABLE index_test_range;
|
DROP TABLE index_test_range;
|
||||||
DROP TABLE index_test_hash;
|
DROP TABLE index_test_hash;
|
||||||
|
|
Loading…
Reference in New Issue