mirror of https://github.com/citusdata/citus.git
Expand target entries with merged array/jsonb subscripting ops into multiple ones
When re-writing query tree, postgres combines multiple subscripting operators referencing to the same attribute into a single `TargetEntry` by linking `SubscriptingRef` objects to each other via their `refexpr` fields. (See `rewriteTargetListIU` & `process_matched_tle` functions.) However, ruleutils function `get_update_query_targetlist_def` doesn't know how to properly deparse such a `TargetEntry`. As a result, we were only taking the last set-by-subscript operation into account when generating the shard query for such an `UPDATE` command. In postgres, this doesn't cause any problems (e.g.: when generating definition of a rule based object) since the query-rewrite transformations aren't performed on the query tree that `get_update_query_targetlist_def` is expected to process. For this reason; with this commit, before processing the target entry list in our ruleutils based deparser, we first expand such target entries into multiple ones. To detect such `SubscriptingRef` objects, we also need to investigate `FieldStore` and `CoerceToDomain` objects as postgres functions processing `SubscriptingRef` objects do --although they do so for different purposes. However, given that Citus already doesn't allow `INSERT/UPDATE` via `FieldStore`, we only do that for `CoerceToDomain` objects.pull/5692/head
parent
1e3c8e34c0
commit
dc293964bf
|
@ -80,6 +80,7 @@ static void deparse_index_columns(StringInfo buffer, List *indexParameterList,
|
|||
static void AppendStorageParametersToString(StringInfo stringBuffer,
|
||||
List *optionList);
|
||||
static void simple_quote_literal(StringInfo buf, const char *val);
|
||||
static SubscriptingRef * TargetEntryExprFindSubsRef(Expr *expr);
|
||||
static char * flatten_reloptions(Oid relid);
|
||||
static void AddVacuumParams(ReindexStmt *reindexStmt, StringInfo buffer);
|
||||
|
||||
|
@ -1353,3 +1354,129 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExpandMergedSubscriptingRefEntries takes a list of target entries and expands
|
||||
* each one that references a SubscriptingRef node that indicates multiple (field)
|
||||
* updates on the same attribute, which is applicable for array/json types atm.
|
||||
*/
|
||||
List *
|
||||
ExpandMergedSubscriptingRefEntries(List *targetEntryList)
|
||||
{
|
||||
List *newTargetEntryList = NIL;
|
||||
|
||||
TargetEntry *targetEntry = NULL;
|
||||
foreach_ptr(targetEntry, targetEntryList)
|
||||
{
|
||||
List *expandedTargetEntries = NIL;
|
||||
|
||||
Expr *expr = targetEntry->expr;
|
||||
while (expr)
|
||||
{
|
||||
SubscriptingRef *subsRef = TargetEntryExprFindSubsRef(expr);
|
||||
if (!subsRef)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove refexpr from the SubscriptingRef that we are about to
|
||||
* wrap in a new TargetEntry and save it for the next one.
|
||||
*/
|
||||
Expr *refexpr = subsRef->refexpr;
|
||||
subsRef->refexpr = NULL;
|
||||
|
||||
/*
|
||||
* Wrap the Expr that holds SubscriptingRef (directly or indirectly)
|
||||
* in a new TargetEntry; note that it doesn't have a refexpr anymore.
|
||||
*/
|
||||
TargetEntry *newTargetEntry = copyObject(targetEntry);
|
||||
newTargetEntry->expr = expr;
|
||||
expandedTargetEntries = lappend(expandedTargetEntries, newTargetEntry);
|
||||
|
||||
/* now inspect the refexpr that SubscriptingRef at hand were holding */
|
||||
expr = refexpr;
|
||||
}
|
||||
|
||||
if (expandedTargetEntries == NIL)
|
||||
{
|
||||
/* return original entry since it doesn't hold a SubscriptingRef node */
|
||||
newTargetEntryList = lappend(newTargetEntryList, targetEntry);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Need to concat expanded target list entries in reverse order
|
||||
* to preserve ordering of the original target entry list.
|
||||
*/
|
||||
newTargetEntryList = list_concat(newTargetEntryList,
|
||||
list_reverse(expandedTargetEntries));
|
||||
}
|
||||
}
|
||||
|
||||
return newTargetEntryList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetEntryExprFindSubsRef searches given Expr --assuming that it is part
|
||||
* of a target list entry-- to see if it directly (i.e.: itself) or indirectly
|
||||
* (e.g.: behind some level of coercions) holds a SubscriptingRef node.
|
||||
*
|
||||
* Returns the original SubscriptingRef node on success or NULL otherwise.
|
||||
*
|
||||
* Note that it wouldn't add much value to use expression_tree_walker here
|
||||
* since we are only interested in a subset of the fields of a few certain
|
||||
* node types.
|
||||
*/
|
||||
static SubscriptingRef *
|
||||
TargetEntryExprFindSubsRef(Expr *expr)
|
||||
{
|
||||
Node *node = (Node *) expr;
|
||||
while (node)
|
||||
{
|
||||
if (IsA(node, FieldStore))
|
||||
{
|
||||
/*
|
||||
* ModifyPartialQuerySupported doesn't allow INSERT/UPDATE via
|
||||
* FieldStore. If we decide supporting such commands, then we
|
||||
* should take the first element of "newvals" list into account
|
||||
* here. This is because, to support such commands, we will need
|
||||
* to expand merged FieldStore into separate target entries too.
|
||||
*
|
||||
* For this reason, this block is not reachable atm and need to
|
||||
* uncomment the following if we decide supporting such commands.
|
||||
*
|
||||
* """
|
||||
* FieldStore *fieldStore = (FieldStore *) node;
|
||||
* node = (Node *) linitial(fieldStore->newvals);
|
||||
* """
|
||||
*/
|
||||
ereport(ERROR, (errmsg("unexpectedly got FieldStore object when "
|
||||
"generating shard query")));
|
||||
}
|
||||
else if (IsA(node, CoerceToDomain))
|
||||
{
|
||||
CoerceToDomain *coerceToDomain = (CoerceToDomain *) node;
|
||||
if (coerceToDomain->coercionformat != COERCE_IMPLICIT_CAST)
|
||||
{
|
||||
/* not an implicit coercion, cannot reach to a SubscriptingRef */
|
||||
break;
|
||||
}
|
||||
|
||||
node = (Node *) coerceToDomain->arg;
|
||||
}
|
||||
else if (IsA(node, SubscriptingRef))
|
||||
{
|
||||
return (SubscriptingRef *) node;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* got a node that we are not interested in */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -3324,6 +3324,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
|
|||
SubLink *cur_ma_sublink;
|
||||
List *ma_sublinks;
|
||||
|
||||
targetList = ExpandMergedSubscriptingRefEntries(targetList);
|
||||
|
||||
/*
|
||||
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
|
||||
* into a list. We expect them to appear, in ID order, in resjunk tlist
|
||||
|
|
|
@ -3263,6 +3263,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
|
|||
SubLink *cur_ma_sublink;
|
||||
List *ma_sublinks;
|
||||
|
||||
targetList = ExpandMergedSubscriptingRefEntries(targetList);
|
||||
|
||||
/*
|
||||
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
|
||||
* into a list. We expect them to appear, in ID order, in resjunk tlist
|
||||
|
|
|
@ -3439,6 +3439,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
|
|||
SubLink *cur_ma_sublink;
|
||||
List *ma_sublinks;
|
||||
|
||||
targetList = ExpandMergedSubscriptingRefEntries(targetList);
|
||||
|
||||
/*
|
||||
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
|
||||
* into a list. We expect them to appear, in ID order, in resjunk tlist
|
||||
|
|
|
@ -704,7 +704,12 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
|||
if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
|
||||
NodeIsFieldStore))
|
||||
{
|
||||
/* DELETE cannot do field indirection already */
|
||||
/*
|
||||
* DELETE cannot do field indirection already, so assert here.
|
||||
*
|
||||
* NB: See TargetEntryExprFindSubsRef if you decide removing
|
||||
* this error check.
|
||||
*/
|
||||
Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT);
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"inserting or modifying composite type fields is not "
|
||||
|
|
|
@ -264,3 +264,20 @@ GenerateListFromElement(void *listElement, int listLength)
|
|||
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* list_reverse returns a new list by reverting order of the elements within
|
||||
* given list.
|
||||
*/
|
||||
List *
|
||||
list_reverse(const List *list)
|
||||
{
|
||||
List *newList = NIL;
|
||||
for (int i = list_length(list) - 1; i >= 0; i--)
|
||||
{
|
||||
newList = lappend(newList, list_nth(list, i));
|
||||
}
|
||||
|
||||
return newList;
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
|
|||
extern bool contain_nextval_expression_walker(Node *node, void *context);
|
||||
extern char * pg_get_replica_identity_command(Oid tableRelationId);
|
||||
extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
|
||||
extern List * ExpandMergedSubscriptingRefEntries(List *targetEntryList);
|
||||
|
||||
/* Function declarations for version dependent PostgreSQL ruleutils functions */
|
||||
extern void pg_get_query_def(Query *query, StringInfo buffer);
|
||||
|
|
|
@ -125,5 +125,6 @@ extern List * ListTake(List *pointerList, int size);
|
|||
extern void * safe_list_nth(const List *list, int index);
|
||||
extern List * GeneratePositiveIntSequenceList(int upTo);
|
||||
extern List * GenerateListFromElement(void *listElement, int listLength);
|
||||
extern List * list_reverse(const List *list);
|
||||
|
||||
#endif /* CITUS_LISTUTILS_H */
|
||||
|
|
|
@ -3,10 +3,6 @@
|
|||
--
|
||||
SET citus.next_shard_id TO 13100000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE FUNCTION deparse_shard_query_test(text)
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
-- create the first table
|
||||
CREATE TABLE raw_events_1
|
||||
(tenant_id bigint,
|
||||
|
|
|
@ -124,3 +124,7 @@ BEGIN
|
|||
END LOOP;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION deparse_shard_query_test(text)
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
|
|
@ -1270,5 +1270,82 @@ SELECT count(*) FROM
|
|||
1
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE jsonb_subscript_update (id INT, data JSONB);
|
||||
SELECT create_distributed_table('jsonb_subscript_update', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}');
|
||||
UPDATE jsonb_subscript_update
|
||||
SET data['b'] = updated_vals.b::TEXT::jsonb,
|
||||
data['c'] = updated_vals.c::TEXT::jsonb,
|
||||
data['d'] = updated_vals.d::TEXT::jsonb
|
||||
FROM (
|
||||
SELECT id,
|
||||
data['a'] AS a,
|
||||
data['a']::NUMERIC + 1 AS b,
|
||||
data['a']::NUMERIC + 2 AS c,
|
||||
data['a']::NUMERIC + 3 AS d
|
||||
FROM jsonb_subscript_update
|
||||
) updated_vals
|
||||
WHERE jsonb_subscript_update.id = updated_vals.id;
|
||||
SELECT * FROM jsonb_subscript_update ORDER BY 1,2;
|
||||
id | data
|
||||
---------------------------------------------------------------------
|
||||
1 | {"a": 1, "b": 2, "c": 3, "d": 4}
|
||||
2 | {"a": 2, "b": 3, "c": 4, "d": 5}
|
||||
(2 rows)
|
||||
|
||||
TRUNCATE jsonb_subscript_update;
|
||||
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}'), (4, '{"a": 4, "b": 10}');
|
||||
ALTER TABLE jsonb_subscript_update ADD CONSTRAINT pkey PRIMARY KEY (id, data);
|
||||
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}')
|
||||
ON CONFLICT (id, data)
|
||||
DO UPDATE SET data['d']=(jsonb_subscript_update.data['a']::INT*100)::TEXT::JSONB,
|
||||
data['b']=(jsonb_subscript_update.data['a']::INT*-100)::TEXT::JSONB;
|
||||
SELECT * FROM jsonb_subscript_update ORDER BY 1,2;
|
||||
id | data
|
||||
---------------------------------------------------------------------
|
||||
1 | {"a": 1, "b": -100, "d": 100}
|
||||
2 | {"a": 2, "b": -200, "d": 200}
|
||||
4 | {"a": 4, "b": 10}
|
||||
(3 rows)
|
||||
|
||||
CREATE TABLE nested_obj_update(id INT, data JSONB, text_col TEXT);
|
||||
SELECT create_distributed_table('nested_obj_update', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO nested_obj_update VALUES
|
||||
(1, '{"a": [1,2,3], "b": [4,5,6], "c": [7,8,9], "d": [1,2,1,2]}', '4'),
|
||||
(2, '{"a": [10,20,30], "b": [41,51,61], "c": [72,82,92], "d": [11,21,11,21]}', '6');
|
||||
UPDATE nested_obj_update
|
||||
SET data['a'][0] = (updated_vals.b * 1)::TEXT::JSONB,
|
||||
data['b'][2] = (updated_vals.c * 2)::TEXT::JSONB,
|
||||
data['c'][0] = (updated_vals.d * 3)::TEXT::JSONB,
|
||||
text_col = (nested_obj_update.id*1000)::TEXT,
|
||||
data['a'][0] = (text_col::INT * data['a'][0]::INT)::TEXT::JSONB,
|
||||
data['d'][6] = (nested_obj_update.id*1)::TEXT::JSONB,
|
||||
data['d'][4] = (nested_obj_update.id*2)::TEXT::JSONB
|
||||
FROM (
|
||||
SELECT id,
|
||||
data['a'][0] AS a,
|
||||
data['b'][0]::NUMERIC + 1 AS b,
|
||||
data['c'][0]::NUMERIC + 2 AS c,
|
||||
data['c'][1]::NUMERIC + 3 AS d
|
||||
FROM nested_obj_update
|
||||
) updated_vals
|
||||
WHERE nested_obj_update.id = updated_vals.id;
|
||||
SELECT * FROM nested_obj_update ORDER BY 1,2,3;
|
||||
id | data | text_col
|
||||
---------------------------------------------------------------------
|
||||
1 | {"a": [4, 2, 3], "b": [4, 5, 18], "c": [33, 8, 9], "d": [1, 2, 1, 2, 2, null, 1]} | 1000
|
||||
2 | {"a": [60, 20, 30], "b": [41, 51, 148], "c": [255, 82, 92], "d": [11, 21, 11, 21, 4, null, 2]} | 2000
|
||||
(2 rows)
|
||||
|
||||
set client_min_messages to error;
|
||||
drop schema pg14 cascade;
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
\set VERBOSITY terse
|
||||
SET citus.next_shard_id TO 1520000;
|
||||
CREATE SCHEMA subscripting_op;
|
||||
SET search_path TO subscripting_op;
|
||||
CREATE TABLE arr_subs_update(id INT, arr INT[], text_col TEXT, int_col_1 INT, int_col_2 INT);
|
||||
SELECT create_distributed_table('arr_subs_update', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO arr_subs_update VALUES (1, '{1,2,3}', 'foo', 50, 60), (2, '{4,5,6}', 'bar', 60, 70);
|
||||
-- test if we can properly expand target list entries when there are dropped columns
|
||||
ALTER TABLE arr_subs_update DROP COLUMN int_col_1;
|
||||
UPDATE arr_subs_update
|
||||
SET arr[1] = updated_vals.b,
|
||||
arr[3] = updated_vals.d,
|
||||
int_col_2 = 400,
|
||||
arr[2] = updated_vals.c
|
||||
FROM (
|
||||
SELECT id,
|
||||
arr[0] AS a,
|
||||
arr[1]::NUMERIC + 1 AS b,
|
||||
arr[2]::NUMERIC + 2 AS c,
|
||||
arr[3]::NUMERIC + 3 AS d
|
||||
FROM arr_subs_update
|
||||
) updated_vals
|
||||
WHERE arr_subs_update.id = updated_vals.id;
|
||||
SELECT * FROM arr_subs_update ORDER BY 1,2,3,4;
|
||||
id | arr | text_col | int_col_2
|
||||
---------------------------------------------------------------------
|
||||
1 | {2,4,6} | foo | 400
|
||||
2 | {5,7,9} | bar | 400
|
||||
(2 rows)
|
||||
|
||||
TRUNCATE arr_subs_update;
|
||||
INSERT INTO arr_subs_update VALUES (1, '{1,2,3}', 'foo', 60), (2, '{4,5,6}', 'bar', 70);
|
||||
ALTER TABLE arr_subs_update ADD CONSTRAINT pkey PRIMARY KEY (id, arr);
|
||||
INSERT INTO arr_subs_update VALUES (1, '{1,2,3}')
|
||||
ON CONFLICT (id, arr)
|
||||
DO UPDATE SET arr[0]=100, arr[1]=200, arr[5]=500;
|
||||
SELECT * FROM arr_subs_update ORDER BY 1,2,3,4;
|
||||
id | arr | text_col | int_col_2
|
||||
---------------------------------------------------------------------
|
||||
1 | [0:5]={100,200,2,3,NULL,500} | foo | 60
|
||||
2 | {4,5,6} | bar | 70
|
||||
(2 rows)
|
||||
|
||||
CREATE DOMAIN single_int_dom AS int[] CHECK (VALUE[1] != 0);
|
||||
CREATE DOMAIN dummy_dom AS single_int_dom CHECK (VALUE[2] != 5);
|
||||
-- Citus doesn't propagate DOMAIN objects
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
CREATE DOMAIN subscripting_op.single_int_dom AS INT[] CHECK (VALUE[1] != 0);
|
||||
CREATE DOMAIN subscripting_op.dummy_dom AS subscripting_op.single_int_dom CHECK (VALUE[2] != 5);
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"CREATE DOMAIN")
|
||||
(localhost,57638,t,"CREATE DOMAIN")
|
||||
(2 rows)
|
||||
|
||||
CREATE TABLE dummy_dom_test (id int, dummy_dom_col dummy_dom);
|
||||
SELECT create_distributed_table('dummy_dom_test', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dummy_dom_test VALUES (1, '{1,2,3}'), (2, '{6,7,8}');
|
||||
UPDATE dummy_dom_test
|
||||
SET dummy_dom_col[2] = 50,
|
||||
dummy_dom_col[1] = 60;
|
||||
SELECT * FROM dummy_dom_test ORDER BY 1,2;
|
||||
id | dummy_dom_col
|
||||
---------------------------------------------------------------------
|
||||
1 | {60,50,3}
|
||||
2 | {60,50,8}
|
||||
(2 rows)
|
||||
|
||||
CREATE TYPE two_ints as (if1 int, if2 int[]);
|
||||
CREATE DOMAIN two_ints_dom AS two_ints CHECK ((VALUE).if1 > 0);
|
||||
-- Citus doesn't propagate DOMAIN objects
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
CREATE DOMAIN subscripting_op.two_ints_dom AS subscripting_op.two_ints CHECK ((VALUE).if1 > 0);
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"CREATE DOMAIN")
|
||||
(localhost,57638,t,"CREATE DOMAIN")
|
||||
(2 rows)
|
||||
|
||||
CREATE TABLE two_ints_dom_indirection_test (id int, two_ints_dom_col two_ints_dom);
|
||||
SELECT create_distributed_table('two_ints_dom_indirection_test', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO two_ints_dom_indirection_test VALUES (1, '(5, "{1,2,3}")'), (2, '(50, "{10,20,30}")');
|
||||
-- Citus planner already doesn't allow doing field indirection (e.g.:
|
||||
-- insert/update <composite type>.<field>) and we have an extra guard against
|
||||
-- that in deparser for future implementations; so here we test that by using
|
||||
-- deparse_shard_query_test() as well.
|
||||
-- i) planner would throw an error
|
||||
UPDATE two_ints_dom_indirection_test
|
||||
SET two_ints_dom_col.if2[1] = 50,
|
||||
two_ints_dom_col.if2[3] = 60;
|
||||
ERROR: inserting or modifying composite type fields is not supported
|
||||
-- ii) deparser would throw an error
|
||||
SELECT public.deparse_shard_query_test(
|
||||
$$
|
||||
UPDATE two_ints_dom_indirection_test
|
||||
SET two_ints_dom_col.if2[1] = 50,
|
||||
two_ints_dom_col.if2[3] = 60;
|
||||
$$);
|
||||
ERROR: unexpectedly got FieldStore object when generating shard query
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA subscripting_op CASCADE;
|
|
@ -78,7 +78,7 @@ test: custom_aggregate_support aggregate_support tdigest_aggregate_support
|
|||
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery
|
||||
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
|
||||
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns
|
||||
test: ch_bench_subquery_repartition
|
||||
test: ch_bench_subquery_repartition subscripting_op
|
||||
test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown
|
||||
test: multi_partition_pruning single_hash_repartition_join
|
||||
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
|
||||
|
|
|
@ -6,11 +6,6 @@
|
|||
SET citus.next_shard_id TO 13100000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE FUNCTION deparse_shard_query_test(text)
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
-- create the first table
|
||||
CREATE TABLE raw_events_1
|
||||
(tenant_id bigint,
|
||||
|
|
|
@ -135,3 +135,8 @@ BEGIN
|
|||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION deparse_shard_query_test(text)
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
|
|
|
@ -640,5 +640,64 @@ SELECT count(*) FROM
|
|||
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'proc_with_out_param')
|
||||
as test;
|
||||
|
||||
CREATE TABLE jsonb_subscript_update (id INT, data JSONB);
|
||||
SELECT create_distributed_table('jsonb_subscript_update', 'id');
|
||||
|
||||
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}');
|
||||
|
||||
UPDATE jsonb_subscript_update
|
||||
SET data['b'] = updated_vals.b::TEXT::jsonb,
|
||||
data['c'] = updated_vals.c::TEXT::jsonb,
|
||||
data['d'] = updated_vals.d::TEXT::jsonb
|
||||
FROM (
|
||||
SELECT id,
|
||||
data['a'] AS a,
|
||||
data['a']::NUMERIC + 1 AS b,
|
||||
data['a']::NUMERIC + 2 AS c,
|
||||
data['a']::NUMERIC + 3 AS d
|
||||
FROM jsonb_subscript_update
|
||||
) updated_vals
|
||||
WHERE jsonb_subscript_update.id = updated_vals.id;
|
||||
|
||||
SELECT * FROM jsonb_subscript_update ORDER BY 1,2;
|
||||
|
||||
TRUNCATE jsonb_subscript_update;
|
||||
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}'), (4, '{"a": 4, "b": 10}');
|
||||
|
||||
ALTER TABLE jsonb_subscript_update ADD CONSTRAINT pkey PRIMARY KEY (id, data);
|
||||
|
||||
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}')
|
||||
ON CONFLICT (id, data)
|
||||
DO UPDATE SET data['d']=(jsonb_subscript_update.data['a']::INT*100)::TEXT::JSONB,
|
||||
data['b']=(jsonb_subscript_update.data['a']::INT*-100)::TEXT::JSONB;
|
||||
|
||||
SELECT * FROM jsonb_subscript_update ORDER BY 1,2;
|
||||
|
||||
CREATE TABLE nested_obj_update(id INT, data JSONB, text_col TEXT);
|
||||
SELECT create_distributed_table('nested_obj_update', 'id');
|
||||
INSERT INTO nested_obj_update VALUES
|
||||
(1, '{"a": [1,2,3], "b": [4,5,6], "c": [7,8,9], "d": [1,2,1,2]}', '4'),
|
||||
(2, '{"a": [10,20,30], "b": [41,51,61], "c": [72,82,92], "d": [11,21,11,21]}', '6');
|
||||
|
||||
UPDATE nested_obj_update
|
||||
SET data['a'][0] = (updated_vals.b * 1)::TEXT::JSONB,
|
||||
data['b'][2] = (updated_vals.c * 2)::TEXT::JSONB,
|
||||
data['c'][0] = (updated_vals.d * 3)::TEXT::JSONB,
|
||||
text_col = (nested_obj_update.id*1000)::TEXT,
|
||||
data['a'][0] = (text_col::INT * data['a'][0]::INT)::TEXT::JSONB,
|
||||
data['d'][6] = (nested_obj_update.id*1)::TEXT::JSONB,
|
||||
data['d'][4] = (nested_obj_update.id*2)::TEXT::JSONB
|
||||
FROM (
|
||||
SELECT id,
|
||||
data['a'][0] AS a,
|
||||
data['b'][0]::NUMERIC + 1 AS b,
|
||||
data['c'][0]::NUMERIC + 2 AS c,
|
||||
data['c'][1]::NUMERIC + 3 AS d
|
||||
FROM nested_obj_update
|
||||
) updated_vals
|
||||
WHERE nested_obj_update.id = updated_vals.id;
|
||||
|
||||
SELECT * FROM nested_obj_update ORDER BY 1,2,3;
|
||||
|
||||
set client_min_messages to error;
|
||||
drop schema pg14 cascade;
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
\set VERBOSITY terse
|
||||
|
||||
SET citus.next_shard_id TO 1520000;
|
||||
|
||||
CREATE SCHEMA subscripting_op;
|
||||
SET search_path TO subscripting_op;
|
||||
|
||||
CREATE TABLE arr_subs_update(id INT, arr INT[], text_col TEXT, int_col_1 INT, int_col_2 INT);
|
||||
SELECT create_distributed_table('arr_subs_update', 'id');
|
||||
INSERT INTO arr_subs_update VALUES (1, '{1,2,3}', 'foo', 50, 60), (2, '{4,5,6}', 'bar', 60, 70);
|
||||
|
||||
-- test if we can properly expand target list entries when there are dropped columns
|
||||
ALTER TABLE arr_subs_update DROP COLUMN int_col_1;
|
||||
|
||||
UPDATE arr_subs_update
|
||||
SET arr[1] = updated_vals.b,
|
||||
arr[3] = updated_vals.d,
|
||||
int_col_2 = 400,
|
||||
arr[2] = updated_vals.c
|
||||
FROM (
|
||||
SELECT id,
|
||||
arr[0] AS a,
|
||||
arr[1]::NUMERIC + 1 AS b,
|
||||
arr[2]::NUMERIC + 2 AS c,
|
||||
arr[3]::NUMERIC + 3 AS d
|
||||
FROM arr_subs_update
|
||||
) updated_vals
|
||||
WHERE arr_subs_update.id = updated_vals.id;
|
||||
|
||||
SELECT * FROM arr_subs_update ORDER BY 1,2,3,4;
|
||||
|
||||
TRUNCATE arr_subs_update;
|
||||
INSERT INTO arr_subs_update VALUES (1, '{1,2,3}', 'foo', 60), (2, '{4,5,6}', 'bar', 70);
|
||||
|
||||
ALTER TABLE arr_subs_update ADD CONSTRAINT pkey PRIMARY KEY (id, arr);
|
||||
|
||||
INSERT INTO arr_subs_update VALUES (1, '{1,2,3}')
|
||||
ON CONFLICT (id, arr)
|
||||
DO UPDATE SET arr[0]=100, arr[1]=200, arr[5]=500;
|
||||
|
||||
SELECT * FROM arr_subs_update ORDER BY 1,2,3,4;
|
||||
|
||||
CREATE DOMAIN single_int_dom AS int[] CHECK (VALUE[1] != 0);
|
||||
CREATE DOMAIN dummy_dom AS single_int_dom CHECK (VALUE[2] != 5);
|
||||
|
||||
-- Citus doesn't propagate DOMAIN objects
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
CREATE DOMAIN subscripting_op.single_int_dom AS INT[] CHECK (VALUE[1] != 0);
|
||||
CREATE DOMAIN subscripting_op.dummy_dom AS subscripting_op.single_int_dom CHECK (VALUE[2] != 5);
|
||||
$$);
|
||||
|
||||
CREATE TABLE dummy_dom_test (id int, dummy_dom_col dummy_dom);
|
||||
SELECT create_distributed_table('dummy_dom_test', 'id');
|
||||
|
||||
INSERT INTO dummy_dom_test VALUES (1, '{1,2,3}'), (2, '{6,7,8}');
|
||||
|
||||
UPDATE dummy_dom_test
|
||||
SET dummy_dom_col[2] = 50,
|
||||
dummy_dom_col[1] = 60;
|
||||
|
||||
SELECT * FROM dummy_dom_test ORDER BY 1,2;
|
||||
|
||||
CREATE TYPE two_ints as (if1 int, if2 int[]);
|
||||
CREATE DOMAIN two_ints_dom AS two_ints CHECK ((VALUE).if1 > 0);
|
||||
|
||||
-- Citus doesn't propagate DOMAIN objects
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
CREATE DOMAIN subscripting_op.two_ints_dom AS subscripting_op.two_ints CHECK ((VALUE).if1 > 0);
|
||||
$$);
|
||||
|
||||
CREATE TABLE two_ints_dom_indirection_test (id int, two_ints_dom_col two_ints_dom);
|
||||
SELECT create_distributed_table('two_ints_dom_indirection_test', 'id');
|
||||
|
||||
INSERT INTO two_ints_dom_indirection_test VALUES (1, '(5, "{1,2,3}")'), (2, '(50, "{10,20,30}")');
|
||||
|
||||
-- Citus planner already doesn't allow doing field indirection (e.g.:
|
||||
-- insert/update <composite type>.<field>) and we have an extra guard against
|
||||
-- that in deparser for future implementations; so here we test that by using
|
||||
-- deparse_shard_query_test() as well.
|
||||
|
||||
-- i) planner would throw an error
|
||||
UPDATE two_ints_dom_indirection_test
|
||||
SET two_ints_dom_col.if2[1] = 50,
|
||||
two_ints_dom_col.if2[3] = 60;
|
||||
|
||||
-- ii) deparser would throw an error
|
||||
SELECT public.deparse_shard_query_test(
|
||||
$$
|
||||
UPDATE two_ints_dom_indirection_test
|
||||
SET two_ints_dom_col.if2[1] = 50,
|
||||
two_ints_dom_col.if2[3] = 60;
|
||||
$$);
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA subscripting_op CASCADE;
|
Loading…
Reference in New Issue