Merge pull request #1287 from citusdata/support_concurrently

Support (CREATE|DROP) INDEX CONCURRENTLY

cr: @metdos
pull/1314/head
Jason Petersen 2017-04-03 12:06:11 -06:00 committed by GitHub
commit bb5ae5eca4
12 changed files with 555 additions and 44 deletions

View File

@ -869,6 +869,27 @@ ExecuteModifyTasksWithoutResults(List *taskList)
}
/*
* ExecuteTasksSequentiallyWithoutResults basically calls ExecuteModifyTasks in
* a loop in order to simulate sequential execution of a list of tasks. Useful
* in cases where issuing commands in parallel before waiting for results could
* result in deadlocks (such as CREATE INDEX CONCURRENTLY).
*/
void
ExecuteTasksSequentiallyWithoutResults(List *taskList)
{
ListCell *taskCell = NULL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
List *singleTask = list_make1(task);
ExecuteModifyTasksWithoutResults(singleTask);
}
}
/*
* ExecuteModifyTasks executes a list of tasks on remote nodes, and
* optionally retrieves the results and stores them in a tuple store.

View File

@ -24,6 +24,7 @@
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_class.h"
@ -134,12 +135,15 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString);
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg);
static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static void PostProcessUtility(Node *parsetree);
static bool warnedUserAbout2PC = false;
@ -367,6 +371,8 @@ multi_ProcessUtility(Node *parsetree,
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
PostProcessUtility(parsetree);
if (commandMustRunAsOwner)
{
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
@ -677,8 +683,9 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
{
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId;
ddlJob->concurrentIndexCmd = createIndexStatement->concurrent;
ddlJob->commandString = createIndexCommand;
ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement);
ddlJobs = list_make1(ddlJob);
}
@ -770,8 +777,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
ddlJob->targetRelationId = distributedRelationId;
ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent;
ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJobs = list_make1(ddlJob);
}
@ -863,6 +872,7 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = alterTableCommand;
if (rightRelationId)
@ -1268,13 +1278,6 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
"currently unsupported")));
}
if (createIndexStatement->concurrent)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("creating indexes concurrently on distributed tables is "
"currently unsupported")));
}
if (createIndexStatement->unique)
{
RangeVar *relation = createIndexStatement->relation;
@ -1352,13 +1355,6 @@ ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement)
errhint("Try dropping each object in a separate DROP "
"command.")));
}
if (dropIndexStatement->concurrent)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("dropping indexes concurrently on distributed tables is "
"currently unsupported")));
}
}
@ -1989,15 +1985,49 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
}
EnsureCoordinator();
ShowNoticeIfNotUsing2PC();
if (shouldSyncMetadata)
if (!ddlJob->concurrentIndexCmd)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
ShowNoticeIfNotUsing2PC();
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
}
else
{
/* save old commit protocol to restore at xact end */
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
PG_TRY();
{
ExecuteTasksSequentiallyWithoutResults(ddlJob->taskList);
if (shouldSyncMetadata)
{
List *commandList = list_make2(DISABLE_DDL_PROPAGATION,
(char *) ddlJob->commandString);
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
}
}
PG_CATCH();
{
ereport(ERROR,
(errmsg("CONCURRENTLY-enabled index command failed"),
errdetail("CONCURRENTLY-enabled index commands can fail partially, "
"leaving behind an INVALID index."),
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
PG_END_TRY();
}
}
@ -2071,6 +2101,117 @@ DDLTaskList(Oid relationId, const char *commandString)
}
/*
* CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command
* against a specified distributed table.
*/
static List *
CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
StringInfoData ddlString;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
initStringInfo(&ddlString);
/* set statement's schema name if it is not set already */
if (indexStmt->relation->schemaname == NULL)
{
indexStmt->relation->schemaname = schemaName;
}
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
Task *task = NULL;
deparse_shard_index_statement(indexStmt, relationId, shardId, &ddlString);
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
resetStringInfo(&ddlString);
}
return taskList;
}
/*
* DropIndexTaskList builds a list of tasks to execute a DROP INDEX command
* against a specified distributed table.
*/
static List *
DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
char *indexName = get_rel_name(indexId);
Oid schemaId = get_rel_namespace(indexId);
char *schemaName = get_namespace_name(schemaId);
StringInfoData ddlString;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
initStringInfo(&ddlString);
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
char *shardIndexName = pstrdup(indexName);
Task *task = NULL;
AppendShardIdToName(&shardIndexName, shardId);
/* deparse shard-specific DROP INDEX command */
appendStringInfo(&ddlString, "DROP INDEX %s %s %s %s",
(dropStmt->concurrent ? "CONCURRENTLY" : ""),
(dropStmt->missing_ok ? "IF EXISTS" : ""),
quote_qualified_identifier(schemaName, shardIndexName),
(dropStmt->behavior == DROP_RESTRICT ? "RESTRICT" : "CASCADE"));
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
resetStringInfo(&ddlString);
}
return taskList;
}
/*
* ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a
* shards of given list of distributed table.
@ -2357,6 +2498,83 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
}
/*
* PostProcessUtility performs additional tasks after a utility's local portion
* has been completed. Right now, the sole use is marking new indexes invalid
* if they were created using the CONCURRENTLY flag. This (non-transactional)
* change provides the fallback state if an error is raised, otherwise a sub-
* sequent change to valid will be committed.
*/
static void
PostProcessUtility(Node *parsetree)
{
IndexStmt *indexStmt = NULL;
Relation relation = NULL;
Oid indexRelationId = InvalidOid;
Relation indexRelation = NULL;
Relation pg_index = NULL;
HeapTuple indexTuple = NULL;
Form_pg_index indexForm = NULL;
/* only IndexStmts are processed */
if (!IsA(parsetree, IndexStmt))
{
return;
}
/* and even then only if they're CONCURRENT */
indexStmt = (IndexStmt *) parsetree;
if (!indexStmt->concurrent)
{
return;
}
/* finally, this logic only applies to the coordinator */
if (!IsCoordinator())
{
return;
}
/* commit the current transaction and start anew */
CommitTransactionCommand();
StartTransactionCommand();
/* get the affected relation and index */
relation = heap_openrv(indexStmt->relation, ShareUpdateExclusiveLock);
indexRelationId = get_relname_relid(indexStmt->idxname,
RelationGetNamespace(relation));
indexRelation = index_open(indexRelationId, RowExclusiveLock);
/* close relations but retain locks */
heap_close(relation, NoLock);
index_close(indexRelation, NoLock);
/* mark index as invalid, in-place (cannot be rolled back) */
index_set_state_flags(indexRelationId, INDEX_DROP_CLEAR_VALID);
/* re-open a transaction command from here on out */
CommitTransactionCommand();
StartTransactionCommand();
/* now, update index's validity in a way that can roll back */
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
indexTuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum(indexRelationId));
Assert(HeapTupleIsValid(indexTuple)); /* better be present, we have lock! */
/* mark as valid, save, and update pg_index indexes */
indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
indexForm->indisvalid = true;
simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
CatalogUpdateIndexes(pg_index, indexTuple);
/* clean up; index now marked valid, but ROLLBACK will mark invalid */
heap_freetuple(indexTuple);
heap_close(pg_index, RowExclusiveLock);
}
/*
* PlanGrantStmt determines whether a given GRANT/REVOKE statement involves
* a distributed table. If so, it creates DDLJobs to encapsulate information
@ -2495,6 +2713,7 @@ PlanGrantStmt(GrantStmt *grantStmt)
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relOid;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = pstrdup(ddlString.data);
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);

View File

@ -68,6 +68,51 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command)
}
/*
* SendBareCommandListToWorkers sends a list of commands to a set of target
* workers in serial. Commands are committed immediately: new connections are
* always used and no transaction block is used (hence "bare"). The connections
* are made as the extension owner to ensure write access to the Citus metadata
* tables. Primarly useful for INDEX commands using CONCURRENTLY.
*/
void
SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
{
List *workerNodeList = WorkerNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();
ListCell *commandCell = NULL;
/* run commands serially */
foreach(workerNodeCell, workerNodeList)
{
MultiConnection *workerConnection = NULL;
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
int connectionFlags = FORCE_NEW_CONNECTION;
if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata)
{
continue;
}
workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName,
nodePort, nodeUser, NULL);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecuteCriticalRemoteCommand(workerConnection, commandString);
}
CloseConnection(workerConnection);
}
}
/*
* SendCommandToWorkersParams sends a command to all workers in parallel.
* Commands are committed on the workers when the local transaction commits. The

View File

@ -24,6 +24,7 @@
#include "access/tupdesc.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
@ -33,12 +34,14 @@
#include "commands/defrem.h"
#include "commands/extension.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/relay_utility.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_utilcmd.h"
#include "storage/lock.h"
#include "utils/acl.h"
#include "utils/array.h"
@ -587,6 +590,104 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
}
/*
* deparse_shard_index_statement uses the provided CREATE INDEX node, dist.
* relation, and shard identifier to populate a provided buffer with a string
* representation of a shard-extended version of that command.
*/
void
deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid,
StringInfo buffer)
{
IndexStmt *indexStmt = copyObject(origStmt); /* copy to avoid modifications */
char *relationName = indexStmt->relation->relname;
char *indexName = indexStmt->idxname;
ListCell *indexParameterCell = NULL;
List *deparseContext = NULL;
/* extend relation and index name using shard identifier */
AppendShardIdToName(&relationName, shardid);
AppendShardIdToName(&indexName, shardid);
/* use extended shard name and transformed stmt for deparsing */
deparseContext = deparse_context_for(relationName, distrelid);
indexStmt = transformIndexStmt(distrelid, indexStmt, NULL);
appendStringInfo(buffer, "CREATE %s INDEX %s %s %s ON %s USING %s ",
(indexStmt->unique ? "UNIQUE" : ""),
(indexStmt->concurrent ? "CONCURRENTLY" : ""),
(indexStmt->if_not_exists ? "IF NOT EXISTS" : ""),
quote_identifier(indexName),
quote_qualified_identifier(indexStmt->relation->schemaname,
relationName),
indexStmt->accessMethod);
/* index column or expression list begins here */
appendStringInfoChar(buffer, '(');
foreach(indexParameterCell, indexStmt->indexParams)
{
IndexElem *indexElement = (IndexElem *) lfirst(indexParameterCell);
/* use commas to separate subsequent elements */
if (indexParameterCell != list_head(indexStmt->indexParams))
{
appendStringInfoChar(buffer, ',');
}
if (indexElement->name)
{
appendStringInfo(buffer, "%s ", quote_identifier(indexElement->name));
}
else if (indexElement->expr)
{
appendStringInfo(buffer, "(%s)", deparse_expression(indexElement->expr,
deparseContext, false,
false));
}
if (indexElement->collation != NIL)
{
appendStringInfo(buffer, "COLLATE %s ",
NameListToQuotedString(indexElement->collation));
}
if (indexElement->opclass != NIL)
{
appendStringInfo(buffer, "%s ",
NameListToQuotedString(indexElement->opclass));
}
if (indexElement->ordering != SORTBY_DEFAULT)
{
bool sortAsc = (indexElement->ordering == SORTBY_ASC);
appendStringInfo(buffer, "%s ", (sortAsc ? "ASC" : "DESC"));
}
if (indexElement->nulls_ordering != SORTBY_NULLS_DEFAULT)
{
bool nullsFirst = (indexElement->nulls_ordering == SORTBY_NULLS_FIRST);
appendStringInfo(buffer, "NULLS %s ", (nullsFirst ? "FIRST" : "LAST"));
}
}
appendStringInfoString(buffer, ") ");
if (indexStmt->options != NIL)
{
appendStringInfoString(buffer, "WITH ");
AppendOptionListToString(buffer, indexStmt->options);
}
if (indexStmt->whereClause != NULL)
{
appendStringInfo(buffer, "WHERE %s", deparse_expression(indexStmt->whereClause,
deparseContext, false,
false));
}
}
/*
* pg_get_indexclusterdef_string returns the definition of a cluster statement
* for given index. The function returns null if the table is not clustered on

View File

@ -32,13 +32,15 @@ extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
int64 shardid, StringInfo buffer);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo
buffer);
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
StringInfo buffer);
extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid);

View File

@ -41,6 +41,7 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -23,6 +23,7 @@ extern bool EnableDDLPropagation;
typedef struct DDLJob
{
Oid targetRelationId; /* oid of the target distributed relation */
bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */
const char *commandString; /* initial (coordinator) DDL command string */
List *taskList; /* worker DDL tasks to execute */
} DDLJob;

View File

@ -30,6 +30,8 @@ typedef enum TargetWorkerSet
extern List * GetWorkerTransactions(void);
extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command);
extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command);
extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet,
List *commandList);
extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);

View File

@ -91,6 +91,8 @@ CREATE INDEX lineitem_orderkey_index on index_test_hash(a);
ERROR: relation "lineitem_orderkey_index" already exists
CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
NOTICE: relation "lineitem_orderkey_index" already exists, skipping
-- Verify that we can create indexes concurrently
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
-- Verify that all indexes got created on the master node and one of the workers
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
schemaname | tablename | indexname | tablespace | indexdef
@ -102,6 +104,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b)
public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL)
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record))
public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey)
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey)
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey)
public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey)
@ -109,13 +112,13 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC)
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber)
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate)
(14 rows)
(15 rows)
\c - - - :worker_1_port
SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1);
count
-------
8
9
(1 row)
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%';
@ -138,8 +141,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%';
\c - - - :master_port
-- Verify that we error out on unsupported statement types
CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey);
ERROR: creating indexes concurrently on distributed tables is currently unsupported
CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey);
ERROR: creating unique indexes on append-partitioned tables is currently unsupported
CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace;
@ -180,6 +181,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b)
public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL)
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record))
public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey)
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey)
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey)
public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey)
@ -187,7 +189,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC)
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber)
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate)
(14 rows)
(15 rows)
--
-- DROP INDEX
@ -196,9 +198,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
DROP INDEX lineitem_orderkey_index, lineitem_partial_index;
ERROR: cannot drop multiple distributed objects in a single command
HINT: Try dropping each object in a separate DROP command.
-- Verify that we error out on the CONCURRENTLY clause
DROP INDEX CONCURRENTLY lineitem_orderkey_index;
ERROR: dropping indexes concurrently on distributed tables is currently unsupported
-- Verify that we can succesfully drop indexes
DROP INDEX lineitem_orderkey_index;
NOTICE: using one-phase commit for distributed DDL commands
@ -221,6 +220,8 @@ DROP INDEX index_test_range_index_a_b_partial;
DROP INDEX index_test_hash_index_a;
DROP INDEX index_test_hash_index_a_b;
DROP INDEX index_test_hash_index_a_b_partial;
-- Verify that we can drop indexes concurrently
DROP INDEX CONCURRENTLY lineitem_concurrently_index;
-- Verify that all the indexes are dropped from the master and one worker node.
-- As there's a primary key, so exclude those from this check.
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
@ -244,7 +245,47 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
------------+-----------+-----------+------------+----------
(0 rows)
-- create index that will conflict with master operations
CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b);
\c - - - :master_port
-- should fail because worker index already exists
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
ERROR: CONCURRENTLY-enabled index command failed
DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index.
HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command.
-- the failure results in an INVALID index
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
Index Valid?
--------------
f
(1 row)
-- we can clean it up and recreate with an DROP IF EXISTS
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
Index Valid?
--------------
t
(1 row)
\c - - - :worker_1_port
-- now drop shard index to test partial master DROP failure
DROP INDEX CONCURRENTLY ith_b_idx_102089;
\c - - - :master_port
DROP INDEX CONCURRENTLY ith_b_idx;
ERROR: CONCURRENTLY-enabled index command failed
DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index.
HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command.
-- the failure results in an INVALID index
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
Index Valid?
--------------
f
(1 row)
-- final clean up
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
-- Drop created tables
DROP TABLE index_test_range;
DROP TABLE index_test_hash;

View File

@ -18,6 +18,7 @@ SELECT * FROM mx_ddl_table ORDER BY key;
CREATE INDEX ddl_test_index ON mx_ddl_table(value);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value);
-- ADD COLUMN
ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER;
-- SET DEFAULT
@ -40,6 +41,7 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL;
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index" btree (value)
"ddl_test_index" btree (value)
\c - - - :worker_1_port
@ -52,9 +54,21 @@ Indexes:
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index" btree (value)
"ddl_test_index" btree (value)
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
Table "public.mx_ddl_table_1220088"
Column | Type | Modifiers
---------+---------+--------------------
key | integer | not null
value | integer |
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index_1220088" btree (value)
"ddl_test_index_1220088" btree (value)
\c - - - :worker_2_port
\d mx_ddl_table
Table "public.mx_ddl_table"
@ -65,9 +79,21 @@ Indexes:
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index" btree (value)
"ddl_test_index" btree (value)
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
Table "public.mx_ddl_table_1220089"
Column | Type | Modifiers
---------+---------+--------------------
key | integer | not null
value | integer |
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index_1220089" btree (value)
"ddl_test_index_1220089" btree (value)
INSERT INTO mx_ddl_table VALUES (37, 78, 2);
INSERT INTO mx_ddl_table VALUES (38, 78);
-- Switch to the coordinator
@ -100,6 +126,7 @@ SELECT * FROM mx_ddl_table ORDER BY key;
DROP INDEX ddl_test_index;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DROP INDEX CONCURRENTLY ddl_test_concurrent_index;
-- DROP DEFAULT
ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT;
-- DROP NOT NULL
@ -126,7 +153,15 @@ Indexes:
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
Table "public.mx_ddl_table_1220088"
Column | Type | Modifiers
--------+---------+-----------
key | integer | not null
value | integer |
Indexes:
"mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key)
\c - - - :worker_2_port
\d mx_ddl_table
Table "public.mx_ddl_table"
@ -137,7 +172,15 @@ Indexes:
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
Table "public.mx_ddl_table_1220089"
Column | Type | Modifiers
--------+---------+-----------
key | integer | not null
value | integer |
Indexes:
"mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key)
-- Show that DDL commands are done within a two-phase commit transaction
\c - - - :master_port
SET client_min_messages TO debug2;

View File

@ -64,6 +64,9 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index_new on lineitem(l_orderkey);
CREATE INDEX lineitem_orderkey_index on index_test_hash(a);
CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
-- Verify that we can create indexes concurrently
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
-- Verify that all indexes got created on the master node and one of the workers
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
\c - - - :worker_1_port
@ -75,7 +78,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%';
-- Verify that we error out on unsupported statement types
CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey);
CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey);
CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace;
@ -105,9 +107,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
-- Verify that we can't drop multiple indexes in a single command
DROP INDEX lineitem_orderkey_index, lineitem_partial_index;
-- Verify that we error out on the CONCURRENTLY clause
DROP INDEX CONCURRENTLY lineitem_orderkey_index;
-- Verify that we can succesfully drop indexes
DROP INDEX lineitem_orderkey_index;
DROP INDEX lineitem_orderkey_index_new;
@ -130,6 +129,9 @@ DROP INDEX index_test_hash_index_a;
DROP INDEX index_test_hash_index_a_b;
DROP INDEX index_test_hash_index_a_b_partial;
-- Verify that we can drop indexes concurrently
DROP INDEX CONCURRENTLY lineitem_concurrently_index;
-- Verify that all the indexes are dropped from the master and one worker node.
-- As there's a primary key, so exclude those from this check.
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
@ -137,8 +139,37 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
\c - - - :worker_1_port
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
-- create index that will conflict with master operations
CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b);
\c - - - :master_port
-- should fail because worker index already exists
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
-- the failure results in an INVALID index
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
-- we can clean it up and recreate with an DROP IF EXISTS
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
\c - - - :worker_1_port
-- now drop shard index to test partial master DROP failure
DROP INDEX CONCURRENTLY ith_b_idx_102089;
\c - - - :master_port
DROP INDEX CONCURRENTLY ith_b_idx;
-- the failure results in an INVALID index
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
-- final clean up
DROP INDEX CONCURRENTLY IF EXISTS ith_b_idx;
-- Drop created tables
DROP TABLE index_test_range;
DROP TABLE index_test_hash;

View File

@ -8,6 +8,8 @@ SELECT * FROM mx_ddl_table ORDER BY key;
-- CREATE INDEX
CREATE INDEX ddl_test_index ON mx_ddl_table(value);
CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value);
-- ADD COLUMN
ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER;
@ -27,13 +29,13 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL;
\d mx_ddl_table
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
\c - - - :worker_2_port
\d mx_ddl_table
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
INSERT INTO mx_ddl_table VALUES (37, 78, 2);
INSERT INTO mx_ddl_table VALUES (38, 78);
@ -56,6 +58,8 @@ SELECT * FROM mx_ddl_table ORDER BY key;
-- DROP INDEX
DROP INDEX ddl_test_index;
DROP INDEX CONCURRENTLY ddl_test_concurrent_index;
-- DROP DEFAULT
ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT;
@ -73,13 +77,13 @@ ALTER TABLE mx_ddl_table DROP COLUMN version;
\d mx_ddl_table
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
\c - - - :worker_2_port
\d mx_ddl_table
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
-- Show that DDL commands are done within a two-phase commit transaction
\c - - - :master_port