mirror of https://github.com/citusdata/citus.git
Remove CONCURRENTLY checks, fix tests
Still pending failure testing, which broke with my recent changes.pull/1287/head
parent
0b6c4e756e
commit
dea6c44f75
|
@ -869,6 +869,23 @@ ExecuteModifyTasksWithoutResults(List *taskList)
|
|||
}
|
||||
|
||||
|
||||
int64
|
||||
ExecuteSequentialTasksWithoutResults(List *taskList)
|
||||
{
|
||||
ListCell *taskCell = NULL;
|
||||
|
||||
foreach(taskCell, taskList)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
List *singleTask = list_make1(task);
|
||||
|
||||
ExecuteModifyTasks(singleTask, false, NULL, NULL);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteModifyTasks executes a list of tasks on remote nodes, and
|
||||
* optionally retrieves the results and stores them in a tuple store.
|
||||
|
|
|
@ -679,6 +679,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
|||
{
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->preventTransaction = createIndexStatement->concurrent;
|
||||
ddlJob->commandString = createIndexCommand;
|
||||
ddlJob->taskList = IndexTaskList(relationId, createIndexStatement);
|
||||
|
||||
|
@ -772,6 +773,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
|||
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
|
||||
|
||||
ddlJob->targetRelationId = distributedRelationId;
|
||||
ddlJob->preventTransaction = dropIndexStatement->concurrent;
|
||||
ddlJob->commandString = dropIndexCommand;
|
||||
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
|
||||
dropIndexStatement);
|
||||
|
@ -866,6 +868,7 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
|||
|
||||
ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = leftRelationId;
|
||||
ddlJob->preventTransaction = false;
|
||||
ddlJob->commandString = alterTableCommand;
|
||||
|
||||
if (rightRelationId)
|
||||
|
@ -1271,13 +1274,6 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
|
|||
"currently unsupported")));
|
||||
}
|
||||
|
||||
if (createIndexStatement->concurrent)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("creating indexes concurrently on distributed tables is "
|
||||
"currently unsupported")));
|
||||
}
|
||||
|
||||
if (createIndexStatement->unique)
|
||||
{
|
||||
RangeVar *relation = createIndexStatement->relation;
|
||||
|
@ -1355,13 +1351,6 @@ ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement)
|
|||
errhint("Try dropping each object in a separate DROP "
|
||||
"command.")));
|
||||
}
|
||||
|
||||
if (dropIndexStatement->concurrent)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("dropping indexes concurrently on distributed tables is "
|
||||
"currently unsupported")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1994,14 +1983,29 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
EnsureCoordinator();
|
||||
ShowNoticeIfNotUsing2PC();
|
||||
|
||||
if (ddlJob->preventTransaction)
|
||||
{
|
||||
/* save old commit protocol to restore at xact end */
|
||||
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
|
||||
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
|
||||
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
}
|
||||
|
||||
if (shouldSyncMetadata)
|
||||
{
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
|
||||
}
|
||||
|
||||
if (ddlJob->preventTransaction)
|
||||
{
|
||||
ExecuteSequentialTasksWithoutResults(ddlJob->taskList);
|
||||
}
|
||||
else
|
||||
{
|
||||
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
|
@ -2608,6 +2612,7 @@ PlanGrantStmt(GrantStmt *grantStmt)
|
|||
|
||||
ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relOid;
|
||||
ddlJob->preventTransaction = false;
|
||||
ddlJob->commandString = pstrdup(ddlString.data);
|
||||
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
|
|||
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
|
||||
|
||||
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
||||
extern int64 ExecuteSequentialTasksWithoutResults(List *taskList);
|
||||
|
||||
|
||||
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
||||
|
|
|
@ -23,6 +23,7 @@ extern bool EnableDDLPropagation;
|
|||
typedef struct DDLJob
|
||||
{
|
||||
Oid targetRelationId; /* oid of the target distributed relation */
|
||||
bool preventTransaction;
|
||||
const char *commandString; /* initial (coordinator) DDL command string */
|
||||
List *taskList; /* worker DDL tasks to execute */
|
||||
} DDLJob;
|
||||
|
|
|
@ -91,6 +91,8 @@ CREATE INDEX lineitem_orderkey_index on index_test_hash(a);
|
|||
ERROR: relation "lineitem_orderkey_index" already exists
|
||||
CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
|
||||
NOTICE: relation "lineitem_orderkey_index" already exists, skipping
|
||||
-- Verify that we can create indexes concurrently
|
||||
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
|
||||
-- Verify that all indexes got created on the master node and one of the workers
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
|
@ -102,6 +104,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b)
|
||||
public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL)
|
||||
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record))
|
||||
public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey)
|
||||
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey)
|
||||
|
@ -109,13 +112,13 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC)
|
||||
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber)
|
||||
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate)
|
||||
(14 rows)
|
||||
(15 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1);
|
||||
count
|
||||
-------
|
||||
8
|
||||
9
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%';
|
||||
|
@ -138,8 +141,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%';
|
|||
|
||||
\c - - - :master_port
|
||||
-- Verify that we error out on unsupported statement types
|
||||
CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey);
|
||||
ERROR: creating indexes concurrently on distributed tables is currently unsupported
|
||||
CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey);
|
||||
ERROR: creating unique indexes on append-partitioned tables is currently unsupported
|
||||
CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace;
|
||||
|
@ -180,6 +181,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b)
|
||||
public | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON index_test_range USING btree (a, b) WHERE (c IS NOT NULL)
|
||||
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record))
|
||||
public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey)
|
||||
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON lineitem USING btree (l_orderkey)
|
||||
|
@ -187,7 +189,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC)
|
||||
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber)
|
||||
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate)
|
||||
(14 rows)
|
||||
(15 rows)
|
||||
|
||||
--
|
||||
-- DROP INDEX
|
||||
|
@ -196,9 +198,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
DROP INDEX lineitem_orderkey_index, lineitem_partial_index;
|
||||
ERROR: cannot drop multiple distributed objects in a single command
|
||||
HINT: Try dropping each object in a separate DROP command.
|
||||
-- Verify that we error out on the CONCURRENTLY clause
|
||||
DROP INDEX CONCURRENTLY lineitem_orderkey_index;
|
||||
ERROR: dropping indexes concurrently on distributed tables is currently unsupported
|
||||
-- Verify that we can succesfully drop indexes
|
||||
DROP INDEX lineitem_orderkey_index;
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
|
@ -221,6 +220,8 @@ DROP INDEX index_test_range_index_a_b_partial;
|
|||
DROP INDEX index_test_hash_index_a;
|
||||
DROP INDEX index_test_hash_index_a_b;
|
||||
DROP INDEX index_test_hash_index_a_b_partial;
|
||||
-- Verify that we can drop indexes concurrently
|
||||
DROP INDEX CONCURRENTLY lineitem_concurrently_index;
|
||||
-- Verify that all the indexes are dropped from the master and one worker node.
|
||||
-- As there's a primary key, so exclude those from this check.
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||
|
|
|
@ -64,6 +64,9 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index_new on lineitem(l_orderkey);
|
|||
CREATE INDEX lineitem_orderkey_index on index_test_hash(a);
|
||||
CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
|
||||
|
||||
-- Verify that we can create indexes concurrently
|
||||
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
|
||||
|
||||
-- Verify that all indexes got created on the master node and one of the workers
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||
\c - - - :worker_1_port
|
||||
|
@ -75,7 +78,6 @@ SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append%';
|
|||
|
||||
-- Verify that we error out on unsupported statement types
|
||||
|
||||
CREATE INDEX CONCURRENTLY try_index ON lineitem (l_orderkey);
|
||||
CREATE UNIQUE INDEX try_index ON lineitem (l_orderkey);
|
||||
CREATE INDEX try_index ON lineitem (l_orderkey) TABLESPACE newtablespace;
|
||||
|
||||
|
@ -105,9 +107,6 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
-- Verify that we can't drop multiple indexes in a single command
|
||||
DROP INDEX lineitem_orderkey_index, lineitem_partial_index;
|
||||
|
||||
-- Verify that we error out on the CONCURRENTLY clause
|
||||
DROP INDEX CONCURRENTLY lineitem_orderkey_index;
|
||||
|
||||
-- Verify that we can succesfully drop indexes
|
||||
DROP INDEX lineitem_orderkey_index;
|
||||
DROP INDEX lineitem_orderkey_index_new;
|
||||
|
@ -130,6 +129,9 @@ DROP INDEX index_test_hash_index_a;
|
|||
DROP INDEX index_test_hash_index_a_b;
|
||||
DROP INDEX index_test_hash_index_a_b_partial;
|
||||
|
||||
-- Verify that we can drop indexes concurrently
|
||||
DROP INDEX CONCURRENTLY lineitem_concurrently_index;
|
||||
|
||||
-- Verify that all the indexes are dropped from the master and one worker node.
|
||||
-- As there's a primary key, so exclude those from this check.
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||
|
|
Loading…
Reference in New Issue