diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4dae254c8..b377d9ba6 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -869,6 +869,12 @@ ExecuteModifyTasksWithoutResults(List *taskList) } +/* + * ExecuteSequentialTasksWithoutResults basically calls ExecuteModifyTasks in a + * loop in order to simulate sequential execution of a list of tasks. Useful in + * cases where issuing commands in parallel before waiting for results could + * result in deadlocks (such as CREATE INDEX CONCURRENTLY). + */ int64 ExecuteSequentialTasksWithoutResults(List *taskList) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 70e1e939f..eece32ed2 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -2090,8 +2090,8 @@ DDLTaskList(Oid relationId, const char *commandString) /* - * DDLTaskList builds a list of tasks to execute a DDL command on a - * given list of shards. + * IndexTaskList builds a list of tasks to execute a CREATE INDEX command + * against a specified distributed table. */ static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt) @@ -2144,8 +2144,8 @@ IndexTaskList(Oid relationId, IndexStmt *indexStmt) /* - * DDLTaskList builds a list of tasks to execute a DDL command on a - * given list of shards. + * DropIndexTaskList builds a list of tasks to execute a DROP INDEX command + * against a specified distributed table. */ static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) @@ -2174,6 +2174,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) AppendShardIdToName(&shardIndexName, shardId); + /* deparse shard-specific DROP INDEX command */ appendStringInfo(&ddlString, "DROP INDEX %s %s %s %s", (dropStmt->concurrent ? "CONCURRENTLY" : ""), (dropStmt->missing_ok ? "IF EXISTS" : ""), diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index cd010523e..c792c6f9c 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -68,6 +68,13 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command) } +/* + * SendBareCommandListToWorkers sends a list of commands to a set of target + * workers in serial. Commands are committed immediately: new connections are + * always used and no transaction block is used (hence "bare"). The connections + * are made as the extension owner to ensure write access to the Citus metadata + * tables. Primarly useful for INDEX commands using CONCURRENTLY. + */ void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) { diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index df2ac1929..0da28da53 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -590,7 +590,12 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) } -char * +/* + * deparse_shard_index_statement uses the provided CREATE INDEX node, dist. + * relation, and shard identifier to populate a provided buffer with a string + * representation of a shard-extended version of that command. + */ +void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer) { @@ -680,8 +685,6 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, deparseContext, false, false)); } - - return buffer->data; } diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index a6f7d2b6d..e7eaf9204 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -32,15 +32,15 @@ extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); -extern char * deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 - shardid, StringInfo buffer); +extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, + int64 shardid, StringInfo buffer); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); -extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo - buffer); +extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, + StringInfo buffer); extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_qualified_relation_name(Oid relid); diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 00ebc1062..e6a4dfacb 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -23,7 +23,7 @@ extern bool EnableDDLPropagation; typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ - bool preventTransaction; + bool preventTransaction; /* prevent use of worker transactions? */ const char *commandString; /* initial (coordinator) DDL command string */ List *taskList; /* worker DDL tasks to execute */ } DDLJob;