diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 53b437e19..25ea7c921 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -58,9 +58,11 @@ #include "lib/stringinfo.h" #include "nodes/execnodes.h" #include "nodes/nodes.h" +#include "nodes/nodeFuncs.h" #include "nodes/params.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "parser/parse_oper.h" #include "nodes/plannodes.h" #include "storage/ipc.h" #include "storage/lock.h" @@ -84,6 +86,9 @@ bool EnableDeadlockPrevention = true; /* number of nested stored procedure call levels we are currently in */ int StoredProcedureLevel = 0; +/* sort the returning to get consistent outputs */ +bool SortReturning = false; + /* functions needed during run phase */ static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, @@ -100,6 +105,7 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); +static void SortTupleStore(CitusScanState *scanState); static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, @@ -594,6 +600,11 @@ RouterModifyExecScan(CustomScanState *node) RouterSequentialModifyExecScan(node); } + if (SortReturning && distributedPlan->hasReturning) + { + SortTupleStore(scanState); + } + scanState->finishedRemoteScan = true; } @@ -603,6 +614,112 @@ RouterModifyExecScan(CustomScanState *node) } +/* + * SortTupleStore gets a CitusScanState and sorts the tuplestore by all the + * entries in the target entry list, starting from the first one and + * ending with the last entry. + * + * The sorting is done in ASC order. + */ +static void +SortTupleStore(CitusScanState *scanState) +{ + TupleDesc tupleDescriptor = + scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + Tuplestorestate *tupleStore = scanState->tuplestorestate; + + List *targetList = scanState->customScanState.ss.ps.plan->targetlist; + uint32 expectedColumnCount = list_length(targetList); + + /* Convert list-ish representation to arrays wanted by executor */ + int numberOfSortKeys = expectedColumnCount; + AttrNumber *sortColIdx = (AttrNumber *) palloc(numberOfSortKeys * sizeof(AttrNumber)); + Oid *sortOperators = (Oid *) palloc(numberOfSortKeys * sizeof(Oid)); + Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid)); + bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool)); + + ListCell *targetCell = NULL; + int sortKeyIndex = 0; + + Tuplesortstate *tuplesortstate = NULL; + + /* + * Iterate on the returning target list and generate the necessary information + * for sorting the tuples. + */ + foreach(targetCell, targetList) + { + TargetEntry *returningEntry = (TargetEntry *) lfirst(targetCell); + Oid sortop = InvalidOid; + + /* determine the sortop, we don't need anything else */ + get_sort_group_operators(exprType((Node *) returningEntry->expr), + true, false, false, + &sortop, NULL, NULL, + NULL); + + sortColIdx[sortKeyIndex] = sortKeyIndex + 1; + sortOperators[sortKeyIndex] = sortop; + collations[sortKeyIndex] = exprCollation((Node *) returningEntry->expr); + nullsFirst[sortKeyIndex] = false; + + sortKeyIndex++; + } + +#if (PG_VERSION_NUM >= 110000) + tuplesortstate = + tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators, + collations, nullsFirst, work_mem, NULL, false); +#else + tuplesortstate = + tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators, + collations, nullsFirst, work_mem, false); +#endif + + while (true) + { + TupleTableSlot *slot = ReturnTupleFromTuplestore(scanState); + + if (TupIsNull(slot)) + { + break; + } + + /* tuplesort_puttupleslot copies the slot into sort context */ + tuplesort_puttupleslot(tuplesortstate, slot); + } + + /* perform the actual sort operation */ + tuplesort_performsort(tuplesortstate); + + /* + * Truncate the existing tupleStore, because we'll fill it back + * from the sorted tuplestore. + */ + tuplestore_clear(tupleStore); + + /* iterate over all the sorted tuples, add them to original tuplestore */ + while (true) + { + TupleTableSlot *newSlot = MakeSingleTupleTableSlot(tupleDescriptor); + bool found = tuplesort_gettupleslot(tuplesortstate, true, false, newSlot, NULL); + + if (!found) + { + break; + } + + /* tuplesort_puttupleslot copies the slot into the tupleStore context */ + tuplestore_puttupleslot(tupleStore, newSlot); + } + + tuplestore_rescan(scanState->tuplestorestate); + + /* terminate the sort, clear unnecessary resources */ + tuplesort_end(tuplesortstate); +} + + /* * RouterSequentialModifyExecScan executes 0 or more modifications on a * distributed table sequentially and stores them in custom scan's tuple diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 778a32dcc..c453927a5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -679,6 +679,20 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.sort_returning", + gettext_noop("Sorts the RETURNING clause to get consistent test output"), + gettext_noop("This feature is not intended for users. It is developed " + "to get consistent regression test outputs. When enabled, " + "the RETURNING clause returns the tuples sorted. The sort " + "is done for all the entries, starting from the first one." + "Finally, the sorting is done in ASC order."), + &SortReturning, + false, + PGC_SUSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_intermediate_result_size", gettext_noop("Sets the maximum size of the intermediate results in KB for " diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 3149f8fe2..10417c8ef 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -35,6 +35,7 @@ typedef struct XactShardConnSet /* Config variables managed via guc.c */ extern bool AllModificationsCommutative; extern bool EnableDeadlockPrevention; +extern bool SortReturning; extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index 2299da913..0d5c55681 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -91,16 +91,16 @@ DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenan DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept OPERATOR(pg_catalog.*) 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept tenant_id | dept -----------+------ - 52 | 18 - 72 | 18 - 82 | 18 - 2 | 18 12 | 18 + 2 | 18 22 | 18 - 62 | 18 - 92 | 18 32 | 18 42 | 18 + 52 | 18 + 62 | 18 + 72 | 18 + 82 | 18 + 92 | 18 (10 rows) -- the subquery foo is recursively planned diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 91cd7e23e..f3a51dc32 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -200,9 +200,9 @@ INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50), RETURNING id; id ------- + 22037 22038 22039 - 22037 (3 rows) SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039; @@ -656,8 +656,8 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; id | app_id | name ----+--------+------ - 5 | 105 | Mynt 4 | 104 | Wayz + 5 | 105 | Mynt (2 rows) INSERT INTO app_analytics_events (id, name) @@ -688,8 +688,8 @@ EXECUTE prep('version-2'); EXECUTE prep('version-3'); id | app_id | name -----+--------+------------- - 400 | | version-3.2 9 | | version-3.1 + 400 | | version-3.2 (2 rows) EXECUTE prep('version-4'); @@ -702,15 +702,15 @@ EXECUTE prep('version-4'); EXECUTE prep('version-5'); id | app_id | name -----+--------+------------- - 400 | | version-5.2 11 | | version-5.1 + 400 | | version-5.2 (2 rows) EXECUTE prep('version-6'); id | app_id | name -----+--------+------------- - 400 | | version-6.2 12 | | version-6.1 + 400 | | version-6.2 (2 rows) SELECT * FROM app_analytics_events ORDER BY id, name; @@ -744,8 +744,8 @@ INSERT INTO app_analytics_events (name) VALUES ('Wayz'), ('Mynt') RETURNING *; id | name ----+------ - 14 | Mynt 13 | Wayz + 14 | Mynt (2 rows) SELECT * FROM app_analytics_events ORDER BY id; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 38f8e859a..4018b3a35 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -876,11 +876,11 @@ SET RETURNING *; value_1 | value_2 | value_3 | value_4 ---------+---------+---------+-------------------------- + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 4 | Sun Dec 04 00:00:00 2016 - 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 (5 rows) DELETE FROM @@ -888,11 +888,11 @@ DELETE FROM RETURNING *; value_1 | value_2 | value_3 | value_4 ---------+---------+---------+-------------------------- + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 + 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 4 | Sun Dec 04 00:00:00 2016 - 45 | 15 | 3 | Sat Dec 03 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 - 45 | 15 | 2 | Fri Dec 02 00:00:00 2016 (5 rows) -- some tests with function evaluation and sequences diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index a8cf8499f..0f2040ed1 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -306,10 +306,10 @@ UPDATE tt2 SET col_2 = 5 RETURNING id, col_2; id | col_2 ----+------- 1 | 5 + 2 | 5 3 | 5 7 | 5 9 | 5 - 2 | 5 (5 rows) SET citus.multi_shard_modify_mode to sequential; @@ -317,10 +317,10 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; id | col_2 ----+------- 1 | 3 + 2 | 3 3 | 3 7 | 3 9 | 3 - 2 | 3 (5 rows) DROP TABLE tt2; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index d3eb1b681..f7e5bac3b 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -317,6 +317,7 @@ push(@pgOptions, '-c', "citus.shard_count=4"); push(@pgOptions, '-c', "citus.shard_max_size=1500kB"); push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4"); push(@pgOptions, '-c', "citus.expire_cached_shards=on"); +push(@pgOptions, '-c', "citus.sort_returning=on"); push(@pgOptions, '-c', "citus.task_tracker_delay=10ms"); push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms"); push(@pgOptions, '-c', "citus.shard_replication_factor=2");