diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 2492356b9..2200f62d5 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -58,17 +58,16 @@ static CustomExecMethods TaskTrackerCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; -static CustomExecMethods RouterSingleModifyCustomExecMethods = { - .CustomName = "RouterSingleModifyScan", +static CustomExecMethods RouterSequentialModifyCustomExecMethods = { + .CustomName = "RouterSequentialModifyScan", .BeginCustomScan = CitusModifyBeginScan, - .ExecCustomScan = RouterSingleModifyExecScan, + .ExecCustomScan = RouterSequentialModifyExecScan, .EndCustomScan = CitusEndScan, .ReScanCustomScan = CitusReScan, .ExplainCustomScan = CitusExplainScan }; -/* not static to enable reference by multi-modify logic in router execution */ -CustomExecMethods RouterMultiModifyCustomExecMethods = { +static CustomExecMethods RouterMultiModifyCustomExecMethods = { .CustomName = "RouterMultiModifyScan", .BeginCustomScan = CitusModifyBeginScan, .ExecCustomScan = RouterMultiModifyExecScan, @@ -100,6 +99,7 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { static void PrepareMasterJobDirectory(Job *workerJob); static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); static Relation StubRelation(TupleDesc tupleDescriptor); +static bool IsMultiRowInsert(Query *query); /* @@ -165,7 +165,7 @@ RouterCreateScan(CustomScan *scan) { if (isModificationQuery) { - scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods; + scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; } else { @@ -175,13 +175,59 @@ RouterCreateScan(CustomScan *scan) else { Assert(isModificationQuery); - scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; + + if (IsMultiRowInsert(workerJob->jobQuery)) + { + /* + * Multi-row INSERT is executed sequentially instead of using + * parallel connections. + */ + scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; + } + else + { + scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; + } } return (Node *) scanState; } +/* + * IsMultiRowInsert returns whether the given query is a multi-row INSERT. + * + * It does this by determining whether the query is an INSERT that has an + * RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away + * in transformInsertStmt, and instead use the target list. + */ +static bool +IsMultiRowInsert(Query *query) +{ + ListCell *rteCell = NULL; + bool hasValuesRTE = false; + + CmdType commandType = query->commandType; + if (commandType != CMD_INSERT) + { + return false; + } + + foreach(rteCell, query->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); + + if (rte->rtekind == RTE_VALUES) + { + hasValuesRTE = true; + break; + } + } + + return hasValuesRTE; +} + + /* * CoordinatorInsertSelectCrateScan creates the scan state for executing * INSERT..SELECT into a distributed table via the coordinator. diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9d7e47446..70f13c82a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -80,7 +80,7 @@ static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool expectResults); + bool multipleTasks, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -427,11 +427,6 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) RaiseDeferredError(planningError, ERROR); } - if (list_length(taskList) > 1) - { - node->methods = &RouterMultiModifyCustomExecMethods; - } - workerJob->taskList = taskList; } @@ -453,11 +448,11 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) /* - * RouterSingleModifyExecScan executes a single modification query on a - * distributed plan and returns results if there is any. + * RouterSequentialModifyExecScan executes 0 or more modifications on a + * distributed table sequentially and returns results if there are any. */ TupleTableSlot * -RouterSingleModifyExecScan(CustomScanState *node) +RouterSequentialModifyExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; TupleTableSlot *resultSlot = NULL; @@ -468,12 +463,25 @@ RouterSingleModifyExecScan(CustomScanState *node) bool hasReturning = multiPlan->hasReturning; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; + ListCell *taskCell = NULL; + bool multipleTasks = list_length(taskList) > 1; - if (list_length(taskList) > 0) + /* + * We could naturally handle function-based transactions (i.e. those using + * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some + * customers already use functions that touch multiple shards from within + * a function, so we'll ignore functions for now. + */ + if (IsTransactionBlock() || multipleTasks) { - Task *task = (Task *) linitial(taskList); + BeginOrContinueCoordinatedTransaction(); + } - ExecuteSingleModifyTask(scanState, task, hasReturning); + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + + ExecuteSingleModifyTask(scanState, task, multipleTasks, hasReturning); } scanState->finishedRemoteScan = true; @@ -682,7 +690,8 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access * framework), or errors out (failed on all placements). */ static void -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults) +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTasks, + bool expectResults) { CmdType operation = scanState->multiPlan->operation; EState *executorState = scanState->customScanState.ss.ps.state; @@ -713,17 +722,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult CoordinatedTransactionUse2PC(); } - /* - * We could naturally handle function-based transactions (i.e. those using - * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some - * customers already use functions that touch multiple shards from within - * a function, so we'll ignore functions for now. - */ - if (IsTransactionBlock()) - { - BeginOrContinueCoordinatedTransaction(); - } - /* * Get connections required to execute task. This will, if necessary, * establish the connection, mark as critical (when modifying reference @@ -775,6 +773,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult /* if we're running a 2PC, the query should fail on error */ failOnError = taskRequiresTwoPhaseCommit; + if (multipleTasks && expectResults) + { + /* + * If we have multiple tasks and one fails, we cannot clear + * the tuple store and start over. Error out instead. + */ + failOnError = true; + } + /* * If caller is interested, store query results the first time * through. The output of the query's execution on other shards is @@ -822,7 +829,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult /* if some placements failed, ensure future statements don't access them */ MarkFailedShardPlacements(); - executorState->es_processed = affectedTupleCount; + executorState->es_processed += affectedTupleCount; if (IsTransactionBlock()) { diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index a710a9bc7..eac04ab92 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -28,8 +28,6 @@ typedef struct CitusScanState } CitusScanState; -extern CustomExecMethods RouterMultiModifyCustomExecMethods; - extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan); diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 7bda81772..fd1fcb658 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -36,7 +36,7 @@ extern bool AllModificationsCommutative; extern bool EnableDeadlockPrevention; extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); -extern TupleTableSlot * RouterSingleModifyExecScan(CustomScanState *node); +extern TupleTableSlot * RouterSequentialModifyExecScan(CustomScanState *node); extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); diff --git a/src/test/regress/expected/isolation_concurrent_dml.out b/src/test/regress/expected/isolation_concurrent_dml.out index 8fab5e91b..69c06723c 100644 --- a/src/test/regress/expected/isolation_concurrent_dml.out +++ b/src/test/regress/expected/isolation_concurrent_dml.out @@ -59,11 +59,10 @@ step s1-multi-insert: step s2-multi-insert-overlap: INSERT INTO test_concurrent_dml VALUES (1), (4); - + step s1-commit: COMMIT; -step s2-multi-insert-overlap: <... completed> starting permutation: s1-begin s2-begin s1-multi-insert s2-multi-insert s1-commit s2-commit master_create_worker_shards diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 2465c7499..3838c8d4c 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -221,10 +221,12 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie" ROLLBACK; --- but it is allowed after a multi-row insert +-- COPY cannot be performed after a multi-row INSERT that uses one connection BEGIN; INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); \copy researchers from stdin delimiter ',' +ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use +CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie" ROLLBACK; -- after a COPY you can modify multiple shards, since they'll use different connections BEGIN; @@ -232,6 +234,11 @@ BEGIN; INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'); INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); ROLLBACK; +-- after a COPY you can perform a multi-row INSERT +BEGIN; +\copy researchers from stdin delimiter ',' +INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); +ROLLBACK; -- COPY can happen before single row INSERT BEGIN; \copy labs from stdin delimiter ',' diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 34cac1151..4d90284d6 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -180,7 +180,7 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); \. ROLLBACK; --- but it is allowed after a multi-row insert +-- COPY cannot be performed after a multi-row INSERT that uses one connection BEGIN; INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); \copy researchers from stdin delimiter ',' @@ -199,6 +199,15 @@ INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'); INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); ROLLBACK; +-- after a COPY you can perform a multi-row INSERT +BEGIN; +\copy researchers from stdin delimiter ',' +3,1,Duth Knonald +10,6,Lesport Lampie +\. +INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); +ROLLBACK; + -- COPY can happen before single row INSERT BEGIN; \copy labs from stdin delimiter ','