Sort output of RETURNING

The feature is only intended for getting consistent outputs for the regression tests.

RETURNING does not have any ordering gurantees and with unified executor, the ordering
of query executions on the shards are also becoming unpredictable. Thus, we're enforcing
ordering when a GUC is set.

We implicitly add an `ORDER BY` something equivalent of
	`
	  RETURNING expr1, expr2, .. ,exprN
	  ORDER BY expr1, expr2, .. ,exprN
	`

As described in the code comments as well, this is probably not the most
performant approach we could implement. However, since we're only
targeting regression tests, I don't see any issues with that. If we
decide to expand this to a feature to users, we should revisit the
implementation and improve the performance.
pull/2677/head
Onder Kalaci 2019-04-19 11:57:55 +03:00
parent 6362c40865
commit 004f28e18c
8 changed files with 153 additions and 20 deletions

View File

@ -58,9 +58,11 @@
#include "lib/stringinfo.h"
#include "nodes/execnodes.h"
#include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/params.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_oper.h"
#include "nodes/plannodes.h"
#include "storage/ipc.h"
#include "storage/lock.h"
@ -84,6 +86,9 @@ bool EnableDeadlockPrevention = true;
/* number of nested stored procedure call levels we are currently in */
int StoredProcedureLevel = 0;
/* sort the returning to get consistent outputs */
bool SortReturning = false;
/* functions needed during run phase */
static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
@ -100,6 +105,7 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task);
static void SortTupleStore(CitusScanState *scanState);
static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
@ -594,6 +600,11 @@ RouterModifyExecScan(CustomScanState *node)
RouterSequentialModifyExecScan(node);
}
if (SortReturning && distributedPlan->hasReturning)
{
SortTupleStore(scanState);
}
scanState->finishedRemoteScan = true;
}
@ -603,6 +614,112 @@ RouterModifyExecScan(CustomScanState *node)
}
/*
* SortTupleStore gets a CitusScanState and sorts the tuplestore by all the
* entries in the target entry list, starting from the first one and
* ending with the last entry.
*
* The sorting is done in ASC order.
*/
static void
SortTupleStore(CitusScanState *scanState)
{
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
Tuplestorestate *tupleStore = scanState->tuplestorestate;
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
uint32 expectedColumnCount = list_length(targetList);
/* Convert list-ish representation to arrays wanted by executor */
int numberOfSortKeys = expectedColumnCount;
AttrNumber *sortColIdx = (AttrNumber *) palloc(numberOfSortKeys * sizeof(AttrNumber));
Oid *sortOperators = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool));
ListCell *targetCell = NULL;
int sortKeyIndex = 0;
Tuplesortstate *tuplesortstate = NULL;
/*
* Iterate on the returning target list and generate the necessary information
* for sorting the tuples.
*/
foreach(targetCell, targetList)
{
TargetEntry *returningEntry = (TargetEntry *) lfirst(targetCell);
Oid sortop = InvalidOid;
/* determine the sortop, we don't need anything else */
get_sort_group_operators(exprType((Node *) returningEntry->expr),
true, false, false,
&sortop, NULL, NULL,
NULL);
sortColIdx[sortKeyIndex] = sortKeyIndex + 1;
sortOperators[sortKeyIndex] = sortop;
collations[sortKeyIndex] = exprCollation((Node *) returningEntry->expr);
nullsFirst[sortKeyIndex] = false;
sortKeyIndex++;
}
#if (PG_VERSION_NUM >= 110000)
tuplesortstate =
tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators,
collations, nullsFirst, work_mem, NULL, false);
#else
tuplesortstate =
tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators,
collations, nullsFirst, work_mem, false);
#endif
while (true)
{
TupleTableSlot *slot = ReturnTupleFromTuplestore(scanState);
if (TupIsNull(slot))
{
break;
}
/* tuplesort_puttupleslot copies the slot into sort context */
tuplesort_puttupleslot(tuplesortstate, slot);
}
/* perform the actual sort operation */
tuplesort_performsort(tuplesortstate);
/*
* Truncate the existing tupleStore, because we'll fill it back
* from the sorted tuplestore.
*/
tuplestore_clear(tupleStore);
/* iterate over all the sorted tuples, add them to original tuplestore */
while (true)
{
TupleTableSlot *newSlot = MakeSingleTupleTableSlot(tupleDescriptor);
bool found = tuplesort_gettupleslot(tuplesortstate, true, false, newSlot, NULL);
if (!found)
{
break;
}
/* tuplesort_puttupleslot copies the slot into the tupleStore context */
tuplestore_puttupleslot(tupleStore, newSlot);
}
tuplestore_rescan(scanState->tuplestorestate);
/* terminate the sort, clear unnecessary resources */
tuplesort_end(tuplesortstate);
}
/*
* RouterSequentialModifyExecScan executes 0 or more modifications on a
* distributed table sequentially and stores them in custom scan's tuple

View File

@ -679,6 +679,20 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_KB,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.sort_returning",
gettext_noop("Sorts the RETURNING clause to get consistent test output"),
gettext_noop("This feature is not intended for users. It is developed "
"to get consistent regression test outputs. When enabled, "
"the RETURNING clause returns the tuples sorted. The sort "
"is done for all the entries, starting from the first one."
"Finally, the sorting is done in ASC order."),
&SortReturning,
false,
PGC_SUSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_intermediate_result_size",
gettext_noop("Sets the maximum size of the intermediate results in KB for "

View File

@ -35,6 +35,7 @@ typedef struct XactShardConnSet
/* Config variables managed via guc.c */
extern bool AllModificationsCommutative;
extern bool EnableDeadlockPrevention;
extern bool SortReturning;
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);

View File

@ -91,16 +91,16 @@ DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenan
DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept OPERATOR(pg_catalog.*) 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept
tenant_id | dept
-----------+------
52 | 18
72 | 18
82 | 18
2 | 18
12 | 18
2 | 18
22 | 18
62 | 18
92 | 18
32 | 18
42 | 18
52 | 18
62 | 18
72 | 18
82 | 18
92 | 18
(10 rows)
-- the subquery foo is recursively planned

View File

@ -200,9 +200,9 @@ INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50),
RETURNING id;
id
-------
22037
22038
22039
22037
(3 rows)
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039;
@ -656,8 +656,8 @@ INSERT INTO app_analytics_events (app_id, name)
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
id | app_id | name
----+--------+------
5 | 105 | Mynt
4 | 104 | Wayz
5 | 105 | Mynt
(2 rows)
INSERT INTO app_analytics_events (id, name)
@ -688,8 +688,8 @@ EXECUTE prep('version-2');
EXECUTE prep('version-3');
id | app_id | name
-----+--------+-------------
400 | | version-3.2
9 | | version-3.1
400 | | version-3.2
(2 rows)
EXECUTE prep('version-4');
@ -702,15 +702,15 @@ EXECUTE prep('version-4');
EXECUTE prep('version-5');
id | app_id | name
-----+--------+-------------
400 | | version-5.2
11 | | version-5.1
400 | | version-5.2
(2 rows)
EXECUTE prep('version-6');
id | app_id | name
-----+--------+-------------
400 | | version-6.2
12 | | version-6.1
400 | | version-6.2
(2 rows)
SELECT * FROM app_analytics_events ORDER BY id, name;
@ -744,8 +744,8 @@ INSERT INTO app_analytics_events (name)
VALUES ('Wayz'), ('Mynt') RETURNING *;
id | name
----+------
14 | Mynt
13 | Wayz
14 | Mynt
(2 rows)
SELECT * FROM app_analytics_events ORDER BY id;

View File

@ -876,11 +876,11 @@ SET
RETURNING *;
value_1 | value_2 | value_3 | value_4
---------+---------+---------+--------------------------
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 4 | Sun Dec 04 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
(5 rows)
DELETE FROM
@ -888,11 +888,11 @@ DELETE FROM
RETURNING *;
value_1 | value_2 | value_3 | value_4
---------+---------+---------+--------------------------
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 4 | Sun Dec 04 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
(5 rows)
-- some tests with function evaluation and sequences

View File

@ -306,10 +306,10 @@ UPDATE tt2 SET col_2 = 5 RETURNING id, col_2;
id | col_2
----+-------
1 | 5
2 | 5
3 | 5
7 | 5
9 | 5
2 | 5
(5 rows)
SET citus.multi_shard_modify_mode to sequential;
@ -317,10 +317,10 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
id | col_2
----+-------
1 | 3
2 | 3
3 | 3
7 | 3
9 | 3
2 | 3
(5 rows)
DROP TABLE tt2;

View File

@ -317,6 +317,7 @@ push(@pgOptions, '-c', "citus.shard_count=4");
push(@pgOptions, '-c', "citus.shard_max_size=1500kB");
push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4");
push(@pgOptions, '-c', "citus.expire_cached_shards=on");
push(@pgOptions, '-c', "citus.sort_returning=on");
push(@pgOptions, '-c', "citus.task_tracker_delay=10ms");
push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms");
push(@pgOptions, '-c', "citus.shard_replication_factor=2");