Merge pull request #2677 from citusdata/implicitly_order_returning

Sort output of RETURNING
pull/2684/head
Önder Kalacı 2019-04-24 11:24:42 +02:00 committed by GitHub
commit 796141334d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 20 deletions

View File

@ -58,9 +58,11 @@
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
#include "nodes/nodes.h" #include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/params.h" #include "nodes/params.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "parser/parse_oper.h"
#include "nodes/plannodes.h" #include "nodes/plannodes.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lock.h" #include "storage/lock.h"
@ -84,6 +86,9 @@ bool EnableDeadlockPrevention = true;
/* number of nested stored procedure call levels we are currently in */ /* number of nested stored procedure call levels we are currently in */
int StoredProcedureLevel = 0; int StoredProcedureLevel = 0;
/* sort the returning to get consistent outputs */
bool SortReturning = false;
/* functions needed during run phase */ /* functions needed during run phase */
static void AcquireMetadataLocks(List *taskList); static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
@ -100,6 +105,7 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList); static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task); static bool RequiresConsistentSnapshot(Task *task);
static void SortTupleStore(CitusScanState *scanState);
static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
@ -594,6 +600,11 @@ RouterModifyExecScan(CustomScanState *node)
RouterSequentialModifyExecScan(node); RouterSequentialModifyExecScan(node);
} }
if (SortReturning && distributedPlan->hasReturning)
{
SortTupleStore(scanState);
}
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
} }
@ -603,6 +614,112 @@ RouterModifyExecScan(CustomScanState *node)
} }
/*
* SortTupleStore gets a CitusScanState and sorts the tuplestore by all the
* entries in the target entry list, starting from the first one and
* ending with the last entry.
*
* The sorting is done in ASC order.
*/
static void
SortTupleStore(CitusScanState *scanState)
{
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
Tuplestorestate *tupleStore = scanState->tuplestorestate;
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
uint32 expectedColumnCount = list_length(targetList);
/* Convert list-ish representation to arrays wanted by executor */
int numberOfSortKeys = expectedColumnCount;
AttrNumber *sortColIdx = (AttrNumber *) palloc(numberOfSortKeys * sizeof(AttrNumber));
Oid *sortOperators = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool));
ListCell *targetCell = NULL;
int sortKeyIndex = 0;
Tuplesortstate *tuplesortstate = NULL;
/*
* Iterate on the returning target list and generate the necessary information
* for sorting the tuples.
*/
foreach(targetCell, targetList)
{
TargetEntry *returningEntry = (TargetEntry *) lfirst(targetCell);
Oid sortop = InvalidOid;
/* determine the sortop, we don't need anything else */
get_sort_group_operators(exprType((Node *) returningEntry->expr),
true, false, false,
&sortop, NULL, NULL,
NULL);
sortColIdx[sortKeyIndex] = sortKeyIndex + 1;
sortOperators[sortKeyIndex] = sortop;
collations[sortKeyIndex] = exprCollation((Node *) returningEntry->expr);
nullsFirst[sortKeyIndex] = false;
sortKeyIndex++;
}
#if (PG_VERSION_NUM >= 110000)
tuplesortstate =
tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators,
collations, nullsFirst, work_mem, NULL, false);
#else
tuplesortstate =
tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators,
collations, nullsFirst, work_mem, false);
#endif
while (true)
{
TupleTableSlot *slot = ReturnTupleFromTuplestore(scanState);
if (TupIsNull(slot))
{
break;
}
/* tuplesort_puttupleslot copies the slot into sort context */
tuplesort_puttupleslot(tuplesortstate, slot);
}
/* perform the actual sort operation */
tuplesort_performsort(tuplesortstate);
/*
* Truncate the existing tupleStore, because we'll fill it back
* from the sorted tuplestore.
*/
tuplestore_clear(tupleStore);
/* iterate over all the sorted tuples, add them to original tuplestore */
while (true)
{
TupleTableSlot *newSlot = MakeSingleTupleTableSlot(tupleDescriptor);
bool found = tuplesort_gettupleslot(tuplesortstate, true, false, newSlot, NULL);
if (!found)
{
break;
}
/* tuplesort_puttupleslot copies the slot into the tupleStore context */
tuplestore_puttupleslot(tupleStore, newSlot);
}
tuplestore_rescan(scanState->tuplestorestate);
/* terminate the sort, clear unnecessary resources */
tuplesort_end(tuplesortstate);
}
/* /*
* RouterSequentialModifyExecScan executes 0 or more modifications on a * RouterSequentialModifyExecScan executes 0 or more modifications on a
* distributed table sequentially and stores them in custom scan's tuple * distributed table sequentially and stores them in custom scan's tuple

View File

@ -679,6 +679,20 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_KB, GUC_UNIT_KB,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.sort_returning",
gettext_noop("Sorts the RETURNING clause to get consistent test output"),
gettext_noop("This feature is not intended for users. It is developed "
"to get consistent regression test outputs. When enabled, "
"the RETURNING clause returns the tuples sorted. The sort "
"is done for all the entries, starting from the first one."
"Finally, the sorting is done in ASC order."),
&SortReturning,
false,
PGC_SUSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_intermediate_result_size", "citus.max_intermediate_result_size",
gettext_noop("Sets the maximum size of the intermediate results in KB for " gettext_noop("Sets the maximum size of the intermediate results in KB for "

View File

@ -35,6 +35,7 @@ typedef struct XactShardConnSet
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
extern bool AllModificationsCommutative; extern bool AllModificationsCommutative;
extern bool EnableDeadlockPrevention; extern bool EnableDeadlockPrevention;
extern bool SortReturning;
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);

View File

@ -91,16 +91,16 @@ DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenan
DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept OPERATOR(pg_catalog.*) 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept OPERATOR(pg_catalog.*) 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept
tenant_id | dept tenant_id | dept
-----------+------ -----------+------
52 | 18
72 | 18
82 | 18
2 | 18
12 | 18 12 | 18
2 | 18
22 | 18 22 | 18
62 | 18
92 | 18
32 | 18 32 | 18
42 | 18 42 | 18
52 | 18
62 | 18
72 | 18
82 | 18
92 | 18
(10 rows) (10 rows)
-- the subquery foo is recursively planned -- the subquery foo is recursively planned

View File

@ -200,9 +200,9 @@ INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50),
RETURNING id; RETURNING id;
id id
------- -------
22037
22038 22038
22039 22039
22037
(3 rows) (3 rows)
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039; SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039;
@ -656,8 +656,8 @@ INSERT INTO app_analytics_events (app_id, name)
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
id | app_id | name id | app_id | name
----+--------+------ ----+--------+------
5 | 105 | Mynt
4 | 104 | Wayz 4 | 104 | Wayz
5 | 105 | Mynt
(2 rows) (2 rows)
INSERT INTO app_analytics_events (id, name) INSERT INTO app_analytics_events (id, name)
@ -688,8 +688,8 @@ EXECUTE prep('version-2');
EXECUTE prep('version-3'); EXECUTE prep('version-3');
id | app_id | name id | app_id | name
-----+--------+------------- -----+--------+-------------
400 | | version-3.2
9 | | version-3.1 9 | | version-3.1
400 | | version-3.2
(2 rows) (2 rows)
EXECUTE prep('version-4'); EXECUTE prep('version-4');
@ -702,15 +702,15 @@ EXECUTE prep('version-4');
EXECUTE prep('version-5'); EXECUTE prep('version-5');
id | app_id | name id | app_id | name
-----+--------+------------- -----+--------+-------------
400 | | version-5.2
11 | | version-5.1 11 | | version-5.1
400 | | version-5.2
(2 rows) (2 rows)
EXECUTE prep('version-6'); EXECUTE prep('version-6');
id | app_id | name id | app_id | name
-----+--------+------------- -----+--------+-------------
400 | | version-6.2
12 | | version-6.1 12 | | version-6.1
400 | | version-6.2
(2 rows) (2 rows)
SELECT * FROM app_analytics_events ORDER BY id, name; SELECT * FROM app_analytics_events ORDER BY id, name;
@ -744,8 +744,8 @@ INSERT INTO app_analytics_events (name)
VALUES ('Wayz'), ('Mynt') RETURNING *; VALUES ('Wayz'), ('Mynt') RETURNING *;
id | name id | name
----+------ ----+------
14 | Mynt
13 | Wayz 13 | Wayz
14 | Mynt
(2 rows) (2 rows)
SELECT * FROM app_analytics_events ORDER BY id; SELECT * FROM app_analytics_events ORDER BY id;

View File

@ -876,11 +876,11 @@ SET
RETURNING *; RETURNING *;
value_1 | value_2 | value_3 | value_4 value_1 | value_2 | value_3 | value_4
---------+---------+---------+-------------------------- ---------+---------+---------+--------------------------
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 4 | Sun Dec 04 00:00:00 2016 45 | 15 | 4 | Sun Dec 04 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
(5 rows) (5 rows)
DELETE FROM DELETE FROM
@ -888,11 +888,11 @@ DELETE FROM
RETURNING *; RETURNING *;
value_1 | value_2 | value_3 | value_4 value_1 | value_2 | value_3 | value_4
---------+---------+---------+-------------------------- ---------+---------+---------+--------------------------
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016 45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 4 | Sun Dec 04 00:00:00 2016 45 | 15 | 4 | Sun Dec 04 00:00:00 2016
45 | 15 | 3 | Sat Dec 03 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
45 | 15 | 2 | Fri Dec 02 00:00:00 2016
(5 rows) (5 rows)
-- some tests with function evaluation and sequences -- some tests with function evaluation and sequences

View File

@ -306,10 +306,10 @@ UPDATE tt2 SET col_2 = 5 RETURNING id, col_2;
id | col_2 id | col_2
----+------- ----+-------
1 | 5 1 | 5
2 | 5
3 | 5 3 | 5
7 | 5 7 | 5
9 | 5 9 | 5
2 | 5
(5 rows) (5 rows)
SET citus.multi_shard_modify_mode to sequential; SET citus.multi_shard_modify_mode to sequential;
@ -317,10 +317,10 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
id | col_2 id | col_2
----+------- ----+-------
1 | 3 1 | 3
2 | 3
3 | 3 3 | 3
7 | 3 7 | 3
9 | 3 9 | 3
2 | 3
(5 rows) (5 rows)
DROP TABLE tt2; DROP TABLE tt2;

View File

@ -317,6 +317,7 @@ push(@pgOptions, '-c', "citus.shard_count=4");
push(@pgOptions, '-c', "citus.shard_max_size=1500kB"); push(@pgOptions, '-c', "citus.shard_max_size=1500kB");
push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4"); push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4");
push(@pgOptions, '-c', "citus.expire_cached_shards=on"); push(@pgOptions, '-c', "citus.expire_cached_shards=on");
push(@pgOptions, '-c', "citus.sort_returning=on");
push(@pgOptions, '-c', "citus.task_tracker_delay=10ms"); push(@pgOptions, '-c', "citus.task_tracker_delay=10ms");
push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms"); push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms");
push(@pgOptions, '-c', "citus.shard_replication_factor=2"); push(@pgOptions, '-c', "citus.shard_replication_factor=2");