From 004f28e18cbe4c7353f6a31785bf60b15c0d8de7 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 19 Apr 2019 11:57:55 +0300 Subject: [PATCH] Sort output of RETURNING The feature is only intended for getting consistent outputs for the regression tests. RETURNING does not have any ordering gurantees and with unified executor, the ordering of query executions on the shards are also becoming unpredictable. Thus, we're enforcing ordering when a GUC is set. We implicitly add an `ORDER BY` something equivalent of ` RETURNING expr1, expr2, .. ,exprN ORDER BY expr1, expr2, .. ,exprN ` As described in the code comments as well, this is probably not the most performant approach we could implement. However, since we're only targeting regression tests, I don't see any issues with that. If we decide to expand this to a feature to users, we should revisit the implementation and improve the performance. --- .../executor/multi_router_executor.c | 117 ++++++++++++++++++ src/backend/distributed/shared_library_init.c | 14 +++ .../distributed/multi_router_executor.h | 1 + src/test/regress/expected/dml_recursive.out | 12 +- .../regress/expected/multi_modifications.out | 12 +- .../expected/multi_reference_table.out | 12 +- .../expected/multi_shard_update_delete.out | 4 +- src/test/regress/pg_regress_multi.pl | 1 + 8 files changed, 153 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 53b437e19..25ea7c921 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -58,9 +58,11 @@ #include "lib/stringinfo.h" #include "nodes/execnodes.h" #include "nodes/nodes.h" +#include "nodes/nodeFuncs.h" #include "nodes/params.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "parser/parse_oper.h" #include "nodes/plannodes.h" #include "storage/ipc.h" #include "storage/lock.h" @@ -84,6 +86,9 @@ bool EnableDeadlockPrevention = true; /* number of nested stored procedure call levels we are currently in */ int StoredProcedureLevel = 0; +/* sort the returning to get consistent outputs */ +bool SortReturning = false; + /* functions needed during run phase */ static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, @@ -100,6 +105,7 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); +static void SortTupleStore(CitusScanState *scanState); static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, @@ -594,6 +600,11 @@ RouterModifyExecScan(CustomScanState *node) RouterSequentialModifyExecScan(node); } + if (SortReturning && distributedPlan->hasReturning) + { + SortTupleStore(scanState); + } + scanState->finishedRemoteScan = true; } @@ -603,6 +614,112 @@ RouterModifyExecScan(CustomScanState *node) } +/* + * SortTupleStore gets a CitusScanState and sorts the tuplestore by all the + * entries in the target entry list, starting from the first one and + * ending with the last entry. + * + * The sorting is done in ASC order. + */ +static void +SortTupleStore(CitusScanState *scanState) +{ + TupleDesc tupleDescriptor = + scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + Tuplestorestate *tupleStore = scanState->tuplestorestate; + + List *targetList = scanState->customScanState.ss.ps.plan->targetlist; + uint32 expectedColumnCount = list_length(targetList); + + /* Convert list-ish representation to arrays wanted by executor */ + int numberOfSortKeys = expectedColumnCount; + AttrNumber *sortColIdx = (AttrNumber *) palloc(numberOfSortKeys * sizeof(AttrNumber)); + Oid *sortOperators = (Oid *) palloc(numberOfSortKeys * sizeof(Oid)); + Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid)); + bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool)); + + ListCell *targetCell = NULL; + int sortKeyIndex = 0; + + Tuplesortstate *tuplesortstate = NULL; + + /* + * Iterate on the returning target list and generate the necessary information + * for sorting the tuples. + */ + foreach(targetCell, targetList) + { + TargetEntry *returningEntry = (TargetEntry *) lfirst(targetCell); + Oid sortop = InvalidOid; + + /* determine the sortop, we don't need anything else */ + get_sort_group_operators(exprType((Node *) returningEntry->expr), + true, false, false, + &sortop, NULL, NULL, + NULL); + + sortColIdx[sortKeyIndex] = sortKeyIndex + 1; + sortOperators[sortKeyIndex] = sortop; + collations[sortKeyIndex] = exprCollation((Node *) returningEntry->expr); + nullsFirst[sortKeyIndex] = false; + + sortKeyIndex++; + } + +#if (PG_VERSION_NUM >= 110000) + tuplesortstate = + tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators, + collations, nullsFirst, work_mem, NULL, false); +#else + tuplesortstate = + tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators, + collations, nullsFirst, work_mem, false); +#endif + + while (true) + { + TupleTableSlot *slot = ReturnTupleFromTuplestore(scanState); + + if (TupIsNull(slot)) + { + break; + } + + /* tuplesort_puttupleslot copies the slot into sort context */ + tuplesort_puttupleslot(tuplesortstate, slot); + } + + /* perform the actual sort operation */ + tuplesort_performsort(tuplesortstate); + + /* + * Truncate the existing tupleStore, because we'll fill it back + * from the sorted tuplestore. + */ + tuplestore_clear(tupleStore); + + /* iterate over all the sorted tuples, add them to original tuplestore */ + while (true) + { + TupleTableSlot *newSlot = MakeSingleTupleTableSlot(tupleDescriptor); + bool found = tuplesort_gettupleslot(tuplesortstate, true, false, newSlot, NULL); + + if (!found) + { + break; + } + + /* tuplesort_puttupleslot copies the slot into the tupleStore context */ + tuplestore_puttupleslot(tupleStore, newSlot); + } + + tuplestore_rescan(scanState->tuplestorestate); + + /* terminate the sort, clear unnecessary resources */ + tuplesort_end(tuplesortstate); +} + + /* * RouterSequentialModifyExecScan executes 0 or more modifications on a * distributed table sequentially and stores them in custom scan's tuple diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 778a32dcc..c453927a5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -679,6 +679,20 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.sort_returning", + gettext_noop("Sorts the RETURNING clause to get consistent test output"), + gettext_noop("This feature is not intended for users. It is developed " + "to get consistent regression test outputs. When enabled, " + "the RETURNING clause returns the tuples sorted. The sort " + "is done for all the entries, starting from the first one." + "Finally, the sorting is done in ASC order."), + &SortReturning, + false, + PGC_SUSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_intermediate_result_size", gettext_noop("Sets the maximum size of the intermediate results in KB for " diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 3149f8fe2..10417c8ef 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -35,6 +35,7 @@ typedef struct XactShardConnSet /* Config variables managed via guc.c */ extern bool AllModificationsCommutative; extern bool EnableDeadlockPrevention; +extern bool SortReturning; extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index 2299da913..0d5c55681 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -91,16 +91,16 @@ DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenan DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept OPERATOR(pg_catalog.*) 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept tenant_id | dept -----------+------ - 52 | 18 - 72 | 18 - 82 | 18 - 2 | 18 12 | 18 + 2 | 18 22 | 18 - 62 | 18 - 92 | 18 32 | 18 42 | 18 + 52 | 18 + 62 | 18 + 72 | 18 + 82 | 18 + 92 | 18 (10 rows) -- the subquery foo is recursively planned diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 91cd7e23e..f3a51dc32 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -200,9 +200,9 @@ INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50), RETURNING id; id ------- + 22037 22038 22039 - 22037 (3 rows) SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039; @@ -656,8 +656,8 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; id | app_id | name ----+--------+------ - 5 | 105 | Mynt 4 | 104 | Wayz + 5 | 105 | Mynt (2 rows) INSERT INTO app_analytics_events (id, name) @@ -688,8 +688,8 @@ EXECUTE prep('version-2'); EXECUTE prep('version-3'); id | app_id | name -----+--------+------------- - 400 | | version-3.2 9 | | version-3.1 + 400 | | version-3.2 (2 rows) EXECUTE prep('version-4'); @@ -702,15 +702,15 @@ EXECUTE prep('version-4'); EXECUTE prep('version-5'); id | app_id | name -----+--------+------------- - 400 | | version-5.2 11 | | version-5.1 + 400 | | version-5.2 (2 rows) EXECUTE prep('version-6'); id | app_id | name -----+--------+------------- - 400 | | version-6.2 12 | | version-6.1 + 400 | | version-6.2 (2 rows) SELECT * FROM app_analytics_events ORDER BY id, name; @@ -744,8 +744,8 @@ INSERT INTO app_analytics_events (name) VALUES ('Wayz'), ('Mynt') RETURNING *; id | name ----+------ - 14 | Mynt 13 | Wayz + 14 | Mynt (2 rows) SELECT * FROM app_analytics_events ORDER BY id; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 38f8e859a..4018b3a35 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -876,11 +876,11 @@ SET RETURNING *; value_1 | value_2 | value_3 | value_4 ---------+---------+---------+-------------------------- + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 4 | Sun Dec 04 00:00:00 2016 - 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 (5 rows) DELETE FROM @@ -888,11 +888,11 @@ DELETE FROM RETURNING *; value_1 | value_2 | value_3 | value_4 ---------+---------+---------+-------------------------- + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 4 | Sun Dec 04 00:00:00 2016 - 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 (5 rows) -- some tests with function evaluation and sequences diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index a8cf8499f..0f2040ed1 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -306,10 +306,10 @@ UPDATE tt2 SET col_2 = 5 RETURNING id, col_2; id | col_2 ----+------- 1 | 5 + 2 | 5 3 | 5 7 | 5 9 | 5 - 2 | 5 (5 rows) SET citus.multi_shard_modify_mode to sequential; @@ -317,10 +317,10 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; id | col_2 ----+------- 1 | 3 + 2 | 3 3 | 3 7 | 3 9 | 3 - 2 | 3 (5 rows) DROP TABLE tt2; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index d3eb1b681..f7e5bac3b 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -317,6 +317,7 @@ push(@pgOptions, '-c', "citus.shard_count=4"); push(@pgOptions, '-c', "citus.shard_max_size=1500kB"); push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4"); push(@pgOptions, '-c', "citus.expire_cached_shards=on"); +push(@pgOptions, '-c', "citus.sort_returning=on"); push(@pgOptions, '-c', "citus.task_tracker_delay=10ms"); push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms"); push(@pgOptions, '-c', "citus.shard_replication_factor=2");