From 271b20a23e0451c37baaca350324864852fc5772 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 11 Oct 2016 03:23:18 +0200 Subject: [PATCH] Parallelise DDL commands --- .../executor/multi_router_executor.c | 20 ++- .../distributed/executor/multi_utility.c | 126 ++++-------------- .../master/master_modify_multiple_shards.c | 2 +- src/include/distributed/multi_planner.h | 5 + .../distributed/multi_router_executor.h | 4 +- .../distributed/multi_router_planner.h | 4 - .../expected/multi_index_statements.out | 6 +- .../expected/multi_join_order_additional.out | 2 - .../multi_alter_table_statements.source | 18 +-- 9 files changed, 62 insertions(+), 125 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index bbce5d070..071038024 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -94,6 +94,10 @@ static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, bool isModificationQuery, bool expectResults); static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); +static int64 ExecuteModifyTasks(List *taskList, bool expectResults, + ParamListInfo paramListInfo, + MaterialState *routerState, + TupleDesc tupleDescriptor); static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *shardIntervalList); @@ -761,6 +765,20 @@ ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, } +/* + * ExecuteModifyTasksWithoutResults provides a wrapper around ExecuteModifyTasks + * for calls that do not require results. In this case, the expectResults flag + * is set to false and arguments related to result sets and query parameters are + * NULL. This function is primarily intended to allow DDL and + * master_modify_multiple_shards to use the router executor infrastructure. + */ +int64 +ExecuteModifyTasksWithoutResults(List *taskList) +{ + return ExecuteModifyTasks(taskList, false, NULL, NULL, NULL); +} + + /* * ExecuteModifyTasks executes a list of tasks on remote nodes, and * optionally retrieves the results and stores them in a tuple store. @@ -769,7 +787,7 @@ ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, * Otherwise, the changes are committed using 2PC when the local transaction * commits. */ -int64 +static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, MaterialState *routerState, TupleDesc tupleDescriptor) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 05182dbba..0a6783fa8 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -37,6 +37,8 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_planner.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/pg_dist_partition.h" @@ -120,9 +122,7 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, bool isTopLevel); static void ShowNoticeIfNotUsing2PC(void); -static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString); -static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, - ShardConnections *shardConnections); +static List * DDLTaskList(Oid relationId, const char *commandString); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); static void CheckCopyPermissions(CopyStmt *copyStatement); @@ -1286,7 +1286,7 @@ static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, bool isTopLevel) { - bool executionOK = false; + List *taskList = NIL; if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1298,15 +1298,9 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ShowNoticeIfNotUsing2PC(); - executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString); + taskList = DDLTaskList(relationId, ddlCommandString); - /* if command could not be executed on any finalized shard placement, error out */ - if (!executionOK) - { - ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); - } - - XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + ExecuteModifyTasksWithoutResults(taskList); } @@ -1330,112 +1324,48 @@ ShowNoticeIfNotUsing2PC(void) /* - * ExecuteCommandOnWorkerShards executes a given command on all the finalized - * shard placements of the given table within a distributed transaction. The - * value of citus.multi_shard_commit_protocol is set to '2pc' by the caller - * ExecuteDistributedDDLCommand function so that two phase commit protocol is used. - * - * ExecuteCommandOnWorkerShards opens an individual connection for each of the - * shard placement. After all connections are opened, a BEGIN command followed by - * a proper "SELECT worker_apply_shard_ddl_command(, )" is - * sent to all open connections in a serial manner. - * - * The opened transactions are handled by the CompleteShardPlacementTransactions - * function. - * - * Note: There are certain errors which would occur on few nodes and not on the - * others. For example, adding a column with a type which exists on some nodes - * and not on the others. - * - * Note: The execution will be blocked if a prepared transaction from previous - * executions exist on the workers. In this case, those prepared transactions should - * be removed by either COMMIT PREPARED or ROLLBACK PREPARED. + * DDLCommandList builds a list of tasks to execute a DDL command on a + * given list of shards. */ -static bool -ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) +static List * +DDLTaskList(Oid relationId, const char *commandString) { + List *taskList = NIL; List *shardIntervalList = LoadShardIntervalList(relationId); - char *tableOwner = TableOwner(relationId); ListCell *shardIntervalCell = NULL; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); + char *escapedSchemaName = quote_literal_cstr(schemaName); + char *escapedCommandString = quote_literal_cstr(commandString); + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; - LockShardListResources(shardIntervalList, ShareLock); - OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner); + /* lock metadata before getting placement lists */ + LockShardListMetadata(shardIntervalList, ShareLock); foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; - ShardConnections *shardConnections = NULL; - bool shardConnectionsFound = false; - char *escapedSchemaName = quote_literal_cstr(schemaName); - char *escapedCommandString = quote_literal_cstr(commandString); StringInfo applyCommand = makeStringInfo(); + Task *task = NULL; - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - Assert(shardConnectionsFound); - - /* build the shard ddl command */ appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, escapedSchemaName, escapedCommandString); - ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections); + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = SQL_TASK; + task->queryString = applyCommand->data; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); - FreeStringInfo(applyCommand); + taskList = lappend(taskList, task); } - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); - - return true; -} - - -/* - * ExecuteCommandOnShardPlacements executes the given ddl command on the - * placements of the given shard, using the given shard connections. - */ -static void -ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, - ShardConnections *shardConnections) -{ - List *connectionList = shardConnections->connectionList; - ListCell *connectionCell = NULL; - - Assert(connectionList != NIL); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - PGresult *result = NULL; - - /* send the query */ - result = PQexec(connection, applyCommand->data); - if (PQresultStatus(result) != PGRES_TUPLES_OK) - { - WarnRemoteError(connection, result); - ereport(ERROR, (errmsg("could not execute DDL command on worker " - "node shards"))); - } - else - { - char *workerName = ConnectionGetOptionValue(connection, "host"); - char *workerPort = ConnectionGetOptionValue(connection, "port"); - - ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT - " on node %s:%s", shardId, workerName, - workerPort))); - } - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - - CHECK_FOR_INTERRUPTS(); - } + return taskList; } diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 169f22d91..45be9044c 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -162,7 +162,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, relationId); - affectedTupleCount = ExecuteModifyTasks(taskList, false, NULL, NULL, NULL); + affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); PG_RETURN_INT32(affectedTupleCount); } diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index 8dac47e00..efec0cf18 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -14,6 +14,11 @@ #include "nodes/relation.h" +/* values used by jobs and tasks which do not require identifiers */ +#define INVALID_JOB_ID 0 +#define INVALID_TASK_ID 0 + + typedef struct RelationRestrictionContext { bool hasDistributedRelation; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 61c3e0c67..3abbef272 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -39,8 +39,6 @@ extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RegisterRouterExecutorXactCallbacks(void); -extern int64 ExecuteModifyTasks(List *taskList, bool expectResults, - ParamListInfo paramListInfo, MaterialState *routerState, - TupleDesc tupleDescriptor); +extern int64 ExecuteModifyTasksWithoutResults(List *taskList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index c77f13482..cb558acd8 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -21,10 +21,6 @@ #include "nodes/parsenodes.h" -/* values used by jobs and tasks which do not require identifiers */ -#define INVALID_JOB_ID 0 -#define INVALID_TASK_ID 0 - /* reserved alias name for UPSERTs */ #define UPSERT_ALIAS "citus_table_alias" diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 639d18ad0..21bbb3502 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -153,14 +153,12 @@ ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX try_index ON lineitem USING gist (l_orderkey); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -WARNING: data type bigint has no default operator class for access method "gist" +ERROR: data type bigint has no default operator class for access method "gist" HINT: You must specify an operator class for the index or define a default operator class for the data type. CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards CREATE INDEX try_index ON lineitem (non_existent_column); -WARNING: column "non_existent_column" does not exist +ERROR: column "non_existent_column" does not exist CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards CREATE INDEX ON lineitem (l_orderkey); ERROR: creating index without a name on a distributed table is currently unsupported -- Verify that none of failed indexes got created on the master node diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 7254b56ba..2d9090150 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -44,8 +44,6 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1); CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -DEBUG: applied command on shard 650000 on node localhost:57637 -DEBUG: applied command on shard 650001 on node localhost:57638 DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" DEBUG: sent COMMIT over connection 650000 DEBUG: sent COMMIT over connection 650001 diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 2e74da86d..36fedcf31 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -261,9 +261,8 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER; NOTICE: relation "non_existent_table" does not exist, skipping ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; -WARNING: column "non_existent_column" of relation "lineitem_alter_220000" does not exist +ERROR: column "non_existent_column" of relation "lineitem_alter_220000" does not exist CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2; @@ -361,17 +360,14 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT and TYPE subc -- Verify that we error out in case of postgres errors on supported statement -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; -WARNING: type "non_existent_type" does not exist +ERROR: type "non_existent_type" does not exist CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; -WARNING: column "null_column" contains null values +ERROR: column "null_column" contains null values CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; -WARNING: invalid input syntax for integer: "a" +ERROR: invalid input syntax for integer: "a" CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards -- Verify that we error out on statements involving RENAME ALTER TABLE lineitem_alter RENAME TO lineitem_renamed; ERROR: renaming distributed tables or their objects is currently unsupported @@ -518,9 +514,8 @@ CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' ALTER TABLE lineitem_alter ADD COLUMN first integer; -WARNING: column "first" of relation "lineitem_alter_220000" already exists +ERROR: column "first" of relation "lineitem_alter_220000" already exists CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards COMMIT; -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; @@ -651,10 +646,9 @@ SELECT master_create_worker_shards('test_ab', 8, 2); INSERT INTO test_ab VALUES (2, 10); INSERT INTO test_ab VALUES (2, 11); CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); -WARNING: could not create unique index "temp_unique_index_1_220022" +ERROR: could not create unique index "temp_unique_index_1_220022" DETAIL: Key (a)=(2) is duplicated. CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard WHERE logicalrelid='test_ab'::regclass AND shardstate=3; shardid