From 1662694471e8b04906d959b83bb4a19311da7d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Thu, 4 May 2023 11:45:02 +0300 Subject: [PATCH 1/6] Update CHANGELOG.md (#6907) Change `citus_stats_tenants` to `citus_stat_tenants` Thanks @clairegiordano for noticing --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c72e64cbf..de9bfeeb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ * Adds support for `MERGE` command on co-located distributed tables joined on distribution column (#6696, #6733) -* Adds the view `citus_stats_tenants` that monitor statistics on tenant usages +* Adds the view `citus_stat_tenants` that monitor statistics on tenant usages (#6725) * Adds the GUC `citus.max_background_task_executors_per_node` to control number From 072ae447423a8314a199ad4b36097ba82bc3a07d Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Thu, 4 May 2023 16:46:02 +0300 Subject: [PATCH 2/6] Adjusts query's CoerceViaIO & RelabelType nodes that are improper for deparsing (#6391) Adjusts query's CoerceViaIO & RelabelType nodes that are improper for deparsing The standard planner converts some `::text` casts to `::cstring` and here we convert back because `cstring` is a pseudotype and it cannot be casted to most types. This problem occurs in CoerceViaIO nodes. There was another problem with RelabelType nodes fixed in the following PR: https://github.com/citusdata/citus/pull/4580 We undo the changes in that PR, and fix both CoerceViaIO and RelabelType nodes in the planning phase (not in the deparsing phase in ruleutils) Fixes https://github.com/citusdata/citus/issues/5646 Fixes https://github.com/citusdata/citus/issues/5033 Fixes https://github.com/citusdata/citus/issues/6061 --- .../distributed/deparser/ruleutils_13.c | 40 +-- .../distributed/deparser/ruleutils_14.c | 40 +-- .../distributed/deparser/ruleutils_15.c | 40 +-- .../planner/multi_physical_planner.c | 145 ++++++++- .../distributed/multi_physical_planner.h | 1 - .../expected/distributed_collations.out | 4 +- src/test/regress/expected/merge.out | 104 +++--- .../expected/prepared_statements_4.out | 304 ++++++++++++++++++ .../prepared_statements_create_load.out | 12 + .../regress/sql/prepared_statements_4.sql | 82 +++++ .../sql/prepared_statements_create_load.sql | 9 + 11 files changed, 632 insertions(+), 149 deletions(-) diff --git a/src/backend/distributed/deparser/ruleutils_13.c b/src/backend/distributed/deparser/ruleutils_13.c index 86eeb19a1..31ef67f97 100644 --- a/src/backend/distributed/deparser/ruleutils_13.c +++ b/src/backend/distributed/deparser/ruleutils_13.c @@ -5246,42 +5246,20 @@ get_rule_expr(Node *node, deparse_context *context, case T_RelabelType: { RelabelType *relabel = (RelabelType *) node; + Node *arg = (Node *) relabel->arg; - /* - * This is a Citus specific modification - * The planner converts CollateExpr to RelabelType - * and here we convert back. - */ - if (relabel->resultcollid != InvalidOid) + if (relabel->relabelformat == COERCE_IMPLICIT_CAST && + !showimplicit) { - CollateExpr *collate = RelabelTypeToCollateExpr(relabel); - Node *arg = (Node *) collate->arg; - - if (!PRETTY_PAREN(context)) - appendStringInfoChar(buf, '('); - get_rule_expr_paren(arg, context, showimplicit, node); - appendStringInfo(buf, " COLLATE %s", - generate_collation_name(collate->collOid)); - if (!PRETTY_PAREN(context)) - appendStringInfoChar(buf, ')'); + /* don't show the implicit cast */ + get_rule_expr_paren(arg, context, false, node); } else { - Node *arg = (Node *) relabel->arg; - - if (relabel->relabelformat == COERCE_IMPLICIT_CAST && - !showimplicit) - { - /* don't show the implicit cast */ - get_rule_expr_paren(arg, context, false, node); - } - else - { - get_coercion_expr(arg, context, - relabel->resulttype, - relabel->resulttypmod, - node); - } + get_coercion_expr(arg, context, + relabel->resulttype, + relabel->resulttypmod, + node); } } break; diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c index 79fae9ac0..b364221d8 100644 --- a/src/backend/distributed/deparser/ruleutils_14.c +++ b/src/backend/distributed/deparser/ruleutils_14.c @@ -5470,42 +5470,20 @@ get_rule_expr(Node *node, deparse_context *context, case T_RelabelType: { RelabelType *relabel = (RelabelType *) node; + Node *arg = (Node *) relabel->arg; - /* - * This is a Citus specific modification - * The planner converts CollateExpr to RelabelType - * and here we convert back. - */ - if (relabel->resultcollid != InvalidOid) + if (relabel->relabelformat == COERCE_IMPLICIT_CAST && + !showimplicit) { - CollateExpr *collate = RelabelTypeToCollateExpr(relabel); - Node *arg = (Node *) collate->arg; - - if (!PRETTY_PAREN(context)) - appendStringInfoChar(buf, '('); - get_rule_expr_paren(arg, context, showimplicit, node); - appendStringInfo(buf, " COLLATE %s", - generate_collation_name(collate->collOid)); - if (!PRETTY_PAREN(context)) - appendStringInfoChar(buf, ')'); + /* don't show the implicit cast */ + get_rule_expr_paren(arg, context, false, node); } else { - Node *arg = (Node *) relabel->arg; - - if (relabel->relabelformat == COERCE_IMPLICIT_CAST && - !showimplicit) - { - /* don't show the implicit cast */ - get_rule_expr_paren(arg, context, false, node); - } - else - { - get_coercion_expr(arg, context, - relabel->resulttype, - relabel->resulttypmod, - node); - } + get_coercion_expr(arg, context, + relabel->resulttype, + relabel->resulttypmod, + node); } } break; diff --git a/src/backend/distributed/deparser/ruleutils_15.c b/src/backend/distributed/deparser/ruleutils_15.c index 827492d87..2dded9b01 100644 --- a/src/backend/distributed/deparser/ruleutils_15.c +++ b/src/backend/distributed/deparser/ruleutils_15.c @@ -5696,42 +5696,20 @@ get_rule_expr(Node *node, deparse_context *context, case T_RelabelType: { RelabelType *relabel = (RelabelType *) node; + Node *arg = (Node *) relabel->arg; - /* - * This is a Citus specific modification - * The planner converts CollateExpr to RelabelType - * and here we convert back. - */ - if (relabel->resultcollid != InvalidOid) + if (relabel->relabelformat == COERCE_IMPLICIT_CAST && + !showimplicit) { - CollateExpr *collate = RelabelTypeToCollateExpr(relabel); - Node *arg = (Node *) collate->arg; - - if (!PRETTY_PAREN(context)) - appendStringInfoChar(buf, '('); - get_rule_expr_paren(arg, context, showimplicit, node); - appendStringInfo(buf, " COLLATE %s", - generate_collation_name(collate->collOid)); - if (!PRETTY_PAREN(context)) - appendStringInfoChar(buf, ')'); + /* don't show the implicit cast */ + get_rule_expr_paren(arg, context, false, node); } else { - Node *arg = (Node *) relabel->arg; - - if (relabel->relabelformat == COERCE_IMPLICIT_CAST && - !showimplicit) - { - /* don't show the implicit cast */ - get_rule_expr_paren(arg, context, false, node); - } - else - { - get_coercion_expr(arg, context, - relabel->resulttype, - relabel->resulttypmod, - node); - } + get_coercion_expr(arg, context, + relabel->resulttype, + relabel->resulttypmod, + node); } } break; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index b210da7d7..24fbd9935 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -28,6 +28,7 @@ #include "access/xlog.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_am.h" +#include "catalog/pg_collation.h" #include "catalog/pg_operator.h" #include "catalog/pg_type.h" #include "commands/defrem.h" @@ -69,6 +70,7 @@ #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" #include "parser/parse_relation.h" +#include "parser/parse_type.h" #include "parser/parsetree.h" #include "rewrite/rewriteManip.h" #include "utils/builtins.h" @@ -79,6 +81,7 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/syscache.h" #include "utils/typcache.h" /* RepartitionJoinBucketCountPerNode determines bucket amount during repartitions */ @@ -231,6 +234,11 @@ static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr); static List * FetchEqualityAttrNumsForList(List *nodeList); static int PartitionColumnIndex(Var *targetVar, List *targetList); static List * GetColumnOriginalIndexes(Oid relationId); +static bool QueryTreeHasImproperForDeparseNodes(Node *inputNode); +static Node * AdjustImproperForDeparseNodes(Node *inputNode); +static bool IsImproperForDeparseRelabelTypeNode(Node *inputNode); +static bool IsImproperForDeparseCoerceViaIONode(Node *inputNode); +static CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType); /* @@ -2683,6 +2691,18 @@ SqlTaskList(Job *job) List *fragmentCombinationList = FragmentCombinationList(rangeTableFragmentsList, jobQuery, dependentJobList); + /* + * Adjust RelabelType and CoerceViaIO nodes that are improper for deparsing. + * We first check if there are any such nodes by using a query tree walker. + * The reason is that a query tree mutator will create a deep copy of all + * the query sublinks, and we don't want to do that unless necessary, as it + * would be inefficient. + */ + if (QueryTreeHasImproperForDeparseNodes((Node *) jobQuery)) + { + jobQuery = (Query *) AdjustImproperForDeparseNodes((Node *) jobQuery); + } + ListCell *fragmentCombinationCell = NULL; foreach(fragmentCombinationCell, fragmentCombinationList) { @@ -2733,7 +2753,7 @@ SqlTaskList(Job *job) * RelabelTypeToCollateExpr converts RelabelType's into CollationExpr's. * With that, we will be able to pushdown COLLATE's. */ -CollateExpr * +static CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType) { Assert(OidIsValid(relabelType->resultcollid)); @@ -5593,3 +5613,126 @@ TaskListHighestTaskId(List *taskList) return highestTaskId; } + + +/* + * QueryTreeHasImproperForDeparseNodes walks over the node, + * and returns true if there are RelabelType or + * CoerceViaIONodes which are improper for deparse + */ +static bool +QueryTreeHasImproperForDeparseNodes(Node *inputNode) +{ + if (inputNode == NULL) + { + return false; + } + else if (IsImproperForDeparseRelabelTypeNode(inputNode) || + IsImproperForDeparseCoerceViaIONode(inputNode)) + { + return true; + } + else if (IsA(inputNode, Query)) + { + return query_tree_walker((Query *) inputNode, + QueryTreeHasImproperForDeparseNodes, + NULL, 0); + } + + return expression_tree_walker(inputNode, + QueryTreeHasImproperForDeparseNodes, + NULL); +} + + +/* + * AdjustImproperForDeparseNodes takes an input rewritten query and modifies + * nodes which, after going through our planner, pose a problem when + * deparsing. So far we have two such type of Nodes that may pose problems: + * RelabelType and CoerceIO nodes. + * Details will be written in comments in the corresponding if conditions. + */ +static Node * +AdjustImproperForDeparseNodes(Node *inputNode) +{ + if (inputNode == NULL) + { + return NULL; + } + + if (IsImproperForDeparseRelabelTypeNode(inputNode)) + { + /* + * The planner converts CollateExpr to RelabelType + * and here we convert back. + */ + return (Node *) RelabelTypeToCollateExpr((RelabelType *) inputNode); + } + else if (IsImproperForDeparseCoerceViaIONode(inputNode)) + { + /* + * The planner converts some ::text/::varchar casts to ::cstring + * and here we convert back to text because cstring is a pseudotype + * and it cannot be casted to most resulttypes + */ + + CoerceViaIO *iocoerce = (CoerceViaIO *) inputNode; + Node *arg = (Node *) iocoerce->arg; + Const *cstringToText = (Const *) arg; + + cstringToText->consttype = TEXTOID; + cstringToText->constlen = -1; + + Type textType = typeidType(TEXTOID); + char *constvalue = NULL; + + if (!cstringToText->constisnull) + { + constvalue = DatumGetCString(cstringToText->constvalue); + } + + cstringToText->constvalue = stringTypeDatum(textType, + constvalue, + cstringToText->consttypmod); + ReleaseSysCache(textType); + return inputNode; + } + else if (IsA(inputNode, Query)) + { + return (Node *) query_tree_mutator((Query *) inputNode, + AdjustImproperForDeparseNodes, + NULL, QTW_DONT_COPY_QUERY); + } + + return expression_tree_mutator(inputNode, AdjustImproperForDeparseNodes, NULL); +} + + +/* + * Checks if the given node is of Relabel type which is improper for deparsing + * The planner converts some CollateExpr to RelabelType nodes, and we need + * to find these nodes. They would be improperly deparsed without the + * "COLLATE" expression. + */ +static bool +IsImproperForDeparseRelabelTypeNode(Node *inputNode) +{ + return (IsA(inputNode, RelabelType) && + OidIsValid(((RelabelType *) inputNode)->resultcollid) && + ((RelabelType *) inputNode)->resultcollid != DEFAULT_COLLATION_OID); +} + + +/* + * Checks if the given node is of CoerceViaIO type which is improper for deparsing + * The planner converts some ::text/::varchar casts to ::cstring, and we need + * to find these nodes. They would be improperly deparsed with "cstring" which cannot + * be casted to most resulttypes. + */ +static bool +IsImproperForDeparseCoerceViaIONode(Node *inputNode) +{ + return (IsA(inputNode, CoerceViaIO) && + IsA(((CoerceViaIO *) inputNode)->arg, Const) && + ((Const *) ((CoerceViaIO *) inputNode)->arg)->consttype == CSTRINGOID); +} diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ea5d15c83..26d074053 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -543,7 +543,6 @@ extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression, List *groupClauseList, List *targetList, bool checkExpressionEquality); -extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType); /* * Function declarations for building, updating constraints and simple operator diff --git a/src/test/regress/expected/distributed_collations.out b/src/test/regress/expected/distributed_collations.out index 0c03c8be7..5e7247b0a 100644 --- a/src/test/regress/expected/distributed_collations.out +++ b/src/test/regress/expected/distributed_collations.out @@ -75,9 +75,9 @@ NOTICE: renaming the new table to collation_tests.test_collate_pushed_down_aggr SET citus.log_remote_commands TO true; SELECT ALL MIN((lower(CAST(test_collate_pushed_down_aggregate.a AS VARCHAR)) COLLATE "C")) FROM ONLY test_collate_pushed_down_aggregate; -NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true +NOTICE: issuing SELECT min((lower(((a)::character varying)::text) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true +NOTICE: issuing SELECT min((lower(((a)::character varying)::text) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx min --------------------------------------------------------------------- diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 190e6b2b3..3cf776ded 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -236,7 +236,7 @@ MERGE INTO target t last_order_id = s.order_id WHEN NOT MATCHED THEN DO NOTHING; -NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center)::text OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT * from target t WHERE t.customer_id = 30002; @@ -1073,7 +1073,7 @@ UPDATE SET value = vl_source.value, id = vl_target.id + 1 WHEN NOT MATCHED THEN INSERT VALUES(vl_source.ID, vl_source.value); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO vl_local FROM vl_target ORDER BY 1 ; -- Should be equal @@ -1331,7 +1331,7 @@ MERGE INTO ft_target WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * FROM ft_target; id | user_val @@ -1667,13 +1667,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1710,13 +1710,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1753,13 +1753,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1836,13 +1836,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1871,7 +1871,7 @@ WHEN NOT MATCHED THEN DO NOTHING; NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1917,13 +1917,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1958,13 +1958,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -2042,13 +2042,13 @@ SELECT count(*) FROM citus_target; -- before merge SET citus.log_remote_commands to true; EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SELECT * FROM citus_target WHERE id = 500; -- non-cached NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) @@ -2059,49 +2059,49 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx (1 row) EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT * FROM citus_target WHERE id = 500; -- cached diff --git a/src/test/regress/expected/prepared_statements_4.out b/src/test/regress/expected/prepared_statements_4.out index 0dba296e8..4b81ec260 100644 --- a/src/test/regress/expected/prepared_statements_4.out +++ b/src/test/regress/expected/prepared_statements_4.out @@ -50,3 +50,307 @@ SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1; 8 (1 row) +-- Standard planner converted text and varchar casts to cstring in some cases +-- We make sure we convert it back to text when parsing the expression +INSERT INTO test VALUES ('2022-02-02', 0); +INSERT INTO test VALUES ('2022-01-01', 1); +INSERT INTO test VALUES ('2021-01-01', 2); +-- try different planners +PREPARE test_statement_regular(text) AS +SELECT user_id FROM test WHERE t >= $1::timestamp ORDER BY user_id; +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +PREPARE test_statement_router(int, text) AS +SELECT user_id FROM test WHERE user_id = $1 AND t >= $2::timestamp ORDER BY user_id; +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_router(1, '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +PREPARE test_statement_repartition(int, text) AS +SELECT count(*) FROM test t1 JOIN test t2 USING (t) WHERE t1.user_id = $1 AND t >= $2::timestamp; +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_repartition(1, '2022-01-01'); + count +--------------------------------------------------------------------- + 1 +(1 row) + +PREPARE test_statement_cte(text, text) AS +WITH cte_1 AS MATERIALIZED (SELECT user_id, t FROM test WHERE t >= $1::timestamp ORDER BY user_id) +SELECT user_id FROM cte_1 WHERE t <= $2::timestamp; +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +PREPARE test_statement_insert(int, text) AS +INSERT INTO test VALUES ($2::timestamp, $1); +EXECUTE test_statement_insert(3, '2022-03-03'); +EXECUTE test_statement_insert(4, '2022-04-04'); +EXECUTE test_statement_insert(5, '2022-05-05'); +EXECUTE test_statement_insert(6, '2022-06-06'); +EXECUTE test_statement_insert(7, '2022-07-07'); +EXECUTE test_statement_insert(8, '2022-08-08'); +EXECUTE test_statement_insert(9, '2022-09-09'); +EXECUTE test_statement_insert(10, '2022-10-10'); +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 11 +(1 row) + +EXECUTE test_statement_regular('2022-01-01'); + user_id +--------------------------------------------------------------------- + 0 + 1 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 +(10 rows) + +PREPARE test_statement_null(text) AS +SELECT user_id , $1::timestamp FROM test ORDER BY user_id LIMIT 2; +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + +EXECUTE test_statement_null(NULL); + user_id | timestamp +--------------------------------------------------------------------- + 0 | + 1 | +(2 rows) + diff --git a/src/test/regress/expected/prepared_statements_create_load.out b/src/test/regress/expected/prepared_statements_create_load.out index d1817f39f..5e3740abe 100644 --- a/src/test/regress/expected/prepared_statements_create_load.out +++ b/src/test/regress/expected/prepared_statements_create_load.out @@ -91,3 +91,15 @@ SELECT create_distributed_table('http_request', 'site_id'); (1 row) +-- Standard planner converted text and varchar casts to cstring in some cases +-- We make sure we convert it back to text when parsing the expression +-- https://github.com/citusdata/citus/issues/6061 +-- https://github.com/citusdata/citus/issues/5646 +-- https://github.com/citusdata/citus/issues/5033 +CREATE TABLE test(t timestamp, user_id int); +SELECT create_distributed_table('test', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/sql/prepared_statements_4.sql b/src/test/regress/sql/prepared_statements_4.sql index a8f124568..bdbc32b08 100644 --- a/src/test/regress/sql/prepared_statements_4.sql +++ b/src/test/regress/sql/prepared_statements_4.sql @@ -43,3 +43,85 @@ EXECUTE foo; SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1; +-- Standard planner converted text and varchar casts to cstring in some cases +-- We make sure we convert it back to text when parsing the expression +INSERT INTO test VALUES ('2022-02-02', 0); +INSERT INTO test VALUES ('2022-01-01', 1); +INSERT INTO test VALUES ('2021-01-01', 2); + +-- try different planners +PREPARE test_statement_regular(text) AS +SELECT user_id FROM test WHERE t >= $1::timestamp ORDER BY user_id; + +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); +EXECUTE test_statement_regular('2022-01-01'); + +PREPARE test_statement_router(int, text) AS +SELECT user_id FROM test WHERE user_id = $1 AND t >= $2::timestamp ORDER BY user_id; + +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); +EXECUTE test_statement_router(1, '2022-01-01'); + +PREPARE test_statement_repartition(int, text) AS +SELECT count(*) FROM test t1 JOIN test t2 USING (t) WHERE t1.user_id = $1 AND t >= $2::timestamp; + +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); +EXECUTE test_statement_repartition(1, '2022-01-01'); + +PREPARE test_statement_cte(text, text) AS +WITH cte_1 AS MATERIALIZED (SELECT user_id, t FROM test WHERE t >= $1::timestamp ORDER BY user_id) +SELECT user_id FROM cte_1 WHERE t <= $2::timestamp; + +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); +EXECUTE test_statement_cte('2022-01-01', '2022-01-01'); + +PREPARE test_statement_insert(int, text) AS +INSERT INTO test VALUES ($2::timestamp, $1); + +EXECUTE test_statement_insert(3, '2022-03-03'); +EXECUTE test_statement_insert(4, '2022-04-04'); +EXECUTE test_statement_insert(5, '2022-05-05'); +EXECUTE test_statement_insert(6, '2022-06-06'); +EXECUTE test_statement_insert(7, '2022-07-07'); +EXECUTE test_statement_insert(8, '2022-08-08'); +EXECUTE test_statement_insert(9, '2022-09-09'); +EXECUTE test_statement_insert(10, '2022-10-10'); + +SELECT count(*) FROM test; +EXECUTE test_statement_regular('2022-01-01'); + +PREPARE test_statement_null(text) AS +SELECT user_id , $1::timestamp FROM test ORDER BY user_id LIMIT 2; + +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); +EXECUTE test_statement_null(NULL); diff --git a/src/test/regress/sql/prepared_statements_create_load.sql b/src/test/regress/sql/prepared_statements_create_load.sql index 6afb1a12f..025ff1f6a 100644 --- a/src/test/regress/sql/prepared_statements_create_load.sql +++ b/src/test/regress/sql/prepared_statements_create_load.sql @@ -73,3 +73,12 @@ CREATE TABLE http_request ( ); SELECT create_distributed_table('http_request', 'site_id'); + +-- Standard planner converted text and varchar casts to cstring in some cases +-- We make sure we convert it back to text when parsing the expression +-- https://github.com/citusdata/citus/issues/6061 +-- https://github.com/citusdata/citus/issues/5646 +-- https://github.com/citusdata/citus/issues/5033 + +CREATE TABLE test(t timestamp, user_id int); +SELECT create_distributed_table('test', 'user_id'); From b58665773ba22a563ada3071ea92d1ec70a701d2 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Wed, 3 May 2023 19:08:08 -0700 Subject: [PATCH 3/6] Move all pre-15-defined routines to the bottom of the file --- .../distributed/planner/merge_planner.c | 339 +++++++++--------- 1 file changed, 168 insertions(+), 171 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 930a44db8..86163e131 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -49,179 +49,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid Node *quals, List *targetList, CmdType commandType); -#endif -/* - * CreateMergePlan attempts to create a plan for the given MERGE SQL - * statement. If planning fails ->planningError is set to a description - * of the failure. - */ -DistributedPlan * -CreateMergePlan(Query *originalQuery, Query *query, - PlannerRestrictionContext *plannerRestrictionContext) -{ - DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - bool multiShardQuery = false; - Oid targetRelationId = ModifyQueryResultRelationId(originalQuery); - - Assert(originalQuery->commandType == CMD_MERGE); - Assert(OidIsValid(targetRelationId)); - - distributedPlan->targetRelationId = targetRelationId; - distributedPlan->modLevel = RowModifyLevelForQuery(query); - distributedPlan->planningError = MergeQuerySupported(targetRelationId, - originalQuery, - multiShardQuery, - plannerRestrictionContext); - - if (distributedPlan->planningError != NULL) - { - return distributedPlan; - } - - Job *job = RouterJob(originalQuery, plannerRestrictionContext, - &distributedPlan->planningError); - - if (distributedPlan->planningError != NULL) - { - return distributedPlan; - } - - ereport(DEBUG1, (errmsg("Creating MERGE router plan"))); - - distributedPlan->workerJob = job; - distributedPlan->combineQuery = NULL; - - /* MERGE doesn't support RETURNING clause */ - distributedPlan->expectResults = false; - distributedPlan->fastPathRouterPlan = - plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; - - return distributedPlan; -} - - -/* - * MergeQuerySupported does check for a MERGE command in the query, if it finds - * one, it will verify the below criteria - * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables - * - Distributed tables requirements in ErrorIfDistTablesNotColocated - * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported - */ -DeferredErrorMessage * -MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery, - PlannerRestrictionContext *plannerRestrictionContext) -{ - /* function is void for pre-15 versions of Postgres */ - #if PG_VERSION_NUM < PG_VERSION_15 - - return NULL; - - #else - - /* - * TODO: For now, we are adding an exception where any volatile or stable - * functions are not allowed in the MERGE query, but this will become too - * restrictive as this will prevent many useful and simple cases, such as, - * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without - * this restriction, we have a potential danger of some of the function(s) - * getting executed at the worker which will result in incorrect behavior. - */ - if (contain_mutable_functions((Node *) originalQuery)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "non-IMMUTABLE functions are not yet supported " - "in MERGE sql with distributed tables ", - NULL, NULL); - } - - List *rangeTableList = ExtractRangeTableEntryList(originalQuery); - - /* - * Fast path queries cannot have merge command, and we prevent the remaining here. - * In Citus we have limited support for MERGE, it's allowed only if all - * the tables(target, source or any CTE) tables are are local i.e. a - * combination of Citus local and Non-Citus tables (regular Postgres tables) - * or distributed tables with some restrictions, please see header of routine - * ErrorIfDistTablesNotColocated for details. - */ - DeferredErrorMessage *deferredError = - ErrorIfMergeHasUnsupportedTables(resultRelationId, - originalQuery, - rangeTableList, - plannerRestrictionContext); - if (deferredError) - { - /* MERGE's unsupported combination, raise the exception */ - RaiseDeferredError(deferredError, ERROR); - } - - deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, - originalQuery->jointree, - originalQuery->jointree-> - quals, - originalQuery->targetList, - originalQuery->commandType); - if (deferredError) - { - return deferredError; - } - - /* - * MERGE is a special case where we have multiple modify statements - * within itself. Check each INSERT/UPDATE/DELETE individually. - */ - MergeAction *action = NULL; - foreach_ptr(action, originalQuery->mergeActionList) - { - Assert(originalQuery->returningList == NULL); - deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, - originalQuery->jointree, - action->qual, - action->targetList, - action->commandType); - if (deferredError) - { - /* MERGE's unsupported scenario, raise the exception */ - RaiseDeferredError(deferredError, ERROR); - } - } - - deferredError = - InsertDistributionColumnMatchesSource(resultRelationId, originalQuery); - if (deferredError) - { - /* MERGE's unsupported scenario, raise the exception */ - RaiseDeferredError(deferredError, ERROR); - } - - if (multiShardQuery) - { - deferredError = - DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, - plannerRestrictionContext); - if (deferredError) - { - return deferredError; - } - } - - if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "a join with USING causes an internal naming " - "conflict, use ON instead", NULL, NULL); - } - - return NULL; - - #endif -} - - -#if PG_VERSION_NUM >= PG_VERSION_15 - /* * ErrorIfDistTablesNotColocated Checks to see if * @@ -728,6 +557,174 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre #endif +/* + * MergeQuerySupported does check for a MERGE command in the query, if it finds + * one, it will verify the below criteria + * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables + * - Distributed tables requirements in ErrorIfDistTablesNotColocated + * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported + */ +DeferredErrorMessage * +MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* function is void for pre-15 versions of Postgres */ + #if PG_VERSION_NUM < PG_VERSION_15 + + return NULL; + + #else + + /* + * TODO: For now, we are adding an exception where any volatile or stable + * functions are not allowed in the MERGE query, but this will become too + * restrictive as this will prevent many useful and simple cases, such as, + * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without + * this restriction, we have a potential danger of some of the function(s) + * getting executed at the worker which will result in incorrect behavior. + */ + if (contain_mutable_functions((Node *) originalQuery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not yet supported " + "in MERGE sql with distributed tables ", + NULL, NULL); + } + + List *rangeTableList = ExtractRangeTableEntryList(originalQuery); + + /* + * Fast path queries cannot have merge command, and we prevent the remaining here. + * In Citus we have limited support for MERGE, it's allowed only if all + * the tables(target, source or any CTE) tables are are local i.e. a + * combination of Citus local and Non-Citus tables (regular Postgres tables) + * or distributed tables with some restrictions, please see header of routine + * ErrorIfDistTablesNotColocated for details. + */ + DeferredErrorMessage *deferredError = + ErrorIfMergeHasUnsupportedTables(resultRelationId, + originalQuery, + rangeTableList, + plannerRestrictionContext); + if (deferredError) + { + /* MERGE's unsupported combination, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + + deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, + originalQuery->jointree, + originalQuery->jointree-> + quals, + originalQuery->targetList, + originalQuery->commandType); + if (deferredError) + { + return deferredError; + } + + /* + * MERGE is a special case where we have multiple modify statements + * within itself. Check each INSERT/UPDATE/DELETE individually. + */ + MergeAction *action = NULL; + foreach_ptr(action, originalQuery->mergeActionList) + { + Assert(originalQuery->returningList == NULL); + deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, + originalQuery->jointree, + action->qual, + action->targetList, + action->commandType); + if (deferredError) + { + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + } + + deferredError = + InsertDistributionColumnMatchesSource(resultRelationId, originalQuery); + if (deferredError) + { + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + + if (multiShardQuery) + { + deferredError = + DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); + if (deferredError) + { + return deferredError; + } + } + + if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "a join with USING causes an internal naming " + "conflict, use ON instead", NULL, NULL); + } + + return NULL; + + #endif +} + + +/* + * CreateMergePlan attempts to create a plan for the given MERGE SQL + * statement. If planning fails ->planningError is set to a description + * of the failure. + */ +DistributedPlan * +CreateMergePlan(Query *originalQuery, Query *query, + PlannerRestrictionContext *plannerRestrictionContext) +{ + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + bool multiShardQuery = false; + Oid targetRelationId = ModifyQueryResultRelationId(originalQuery); + + Assert(originalQuery->commandType == CMD_MERGE); + Assert(OidIsValid(targetRelationId)); + + distributedPlan->targetRelationId = targetRelationId; + distributedPlan->modLevel = RowModifyLevelForQuery(query); + distributedPlan->planningError = MergeQuerySupported(targetRelationId, + originalQuery, + multiShardQuery, + plannerRestrictionContext); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } + + Job *job = RouterJob(originalQuery, plannerRestrictionContext, + &distributedPlan->planningError); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } + + ereport(DEBUG1, (errmsg("Creating MERGE router plan"))); + + distributedPlan->workerJob = job; + distributedPlan->combineQuery = NULL; + + /* MERGE doesn't support RETURNING clause */ + distributedPlan->expectResults = false; + distributedPlan->fastPathRouterPlan = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; + + return distributedPlan; +} + + /* * IsLocalTableModification returns true if the table modified is a Postgres table. * We do not support recursive planning for MERGE yet, so we could have a join From 3217e3f1817b63df68ae79f0bf80448b4074e9fc Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Fri, 5 May 2023 12:07:46 +0300 Subject: [PATCH 4/6] Fix flaky background rebalance parallel test (#6893) A test in background_rebalance_parallel.sql was failing intermittently where the order of tasks in the output was not deterministic. This commit fixes the test by removing id columns for the background tasks in the output. A sample failing diff before this patch is below: ```diff SELECT D.task_id, (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), D.depends_on, (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; task_id | command | depends_on | command ---------+---------------------------------------------------------------------+------------+--------------------------------------------------------------------- - 1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto') - 1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto') - 1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto') - 1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto') + 1014 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto') + 1016 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto') + 1018 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto') + 1020 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto') (4 rows) ``` Notice that the dependent and dependee tasks have some commands, but they have different task ids. --- .../background_rebalance_parallel.out | 30 ++++++++++++------- .../sql/background_rebalance_parallel.sql | 13 ++++---- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index 9c43fab9b..187f709e4 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -466,17 +466,27 @@ SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset -- see dependent tasks to understand which tasks remain runnable because of -- citus.max_background_task_executors_per_node -- and which tasks are actually blocked from colocation group dependencies -SELECT D.task_id, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), - D.depends_on, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) -FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; - task_id | command | depends_on | command +SELECT (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + (SELECT T.command depends_on_command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY 1, 2 ASC; + command | depends_on_command --------------------------------------------------------------------- - 1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto') - 1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto') - 1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto') - 1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto') + SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto') + SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto') + SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto') + SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto') +(4 rows) + +SELECT task_id, depends_on +FROM pg_dist_background_task_depend +WHERE job_id in (:job_id) +ORDER BY 1, 2 ASC; + task_id | depends_on +--------------------------------------------------------------------- + 1014 | 1013 + 1016 | 1015 + 1018 | 1017 + 1020 | 1019 (4 rows) -- default citus.max_background_task_executors_per_node is 1 diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql index 5229e7f88..e55fd93bb 100644 --- a/src/test/regress/sql/background_rebalance_parallel.sql +++ b/src/test/regress/sql/background_rebalance_parallel.sql @@ -204,11 +204,14 @@ SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset -- see dependent tasks to understand which tasks remain runnable because of -- citus.max_background_task_executors_per_node -- and which tasks are actually blocked from colocation group dependencies -SELECT D.task_id, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), - D.depends_on, - (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) -FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; +SELECT (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + (SELECT T.command depends_on_command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY 1, 2 ASC; + +SELECT task_id, depends_on +FROM pg_dist_background_task_depend +WHERE job_id in (:job_id) +ORDER BY 1, 2 ASC; -- default citus.max_background_task_executors_per_node is 1 -- show that first exactly one task per node is running From 905fd46410544c98aac2be92d188c52cc0bb2802 Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Fri, 5 May 2023 16:47:01 +0300 Subject: [PATCH 5/6] Fixes flakiness in background_rebalance_parallel test (#6910) Fixes the following flaky outputs by decreasing citus_task_wait loop interval, and changing the order of wait commands. https://app.circleci.com/pipelines/github/citusdata/citus/32102/workflows/19958297-6c7e-49ef-9bc2-8efe8aacb96f/jobs/1089589 ``` diff SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; job_id | task_id | status | nodes_involved --------+---------+----------+---------------- 17779 | 1013 | done | {50,56} 17779 | 1014 | running | {50,57} - 17779 | 1015 | running | {50,56} - 17779 | 1016 | blocked | {50,57} + 17779 | 1015 | done | {50,56} + 17779 | 1016 | running | {50,57} 17779 | 1017 | runnable | {50,56} 17779 | 1018 | blocked | {50,57} 17779 | 1019 | runnable | {50,56} 17779 | 1020 | blocked | {50,57} (8 rows) ``` https://github.com/citusdata/citus/pull/6893#issuecomment-1525661408 ```diff SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; job_id | task_id | status | nodes_involved --------+---------+----------+---------------- 17779 | 1013 | done | {50,56} - 17779 | 1014 | running | {50,57} + 17779 | 1014 | runnable | {50,57} 17779 | 1015 | running | {50,56} 17779 | 1016 | blocked | {50,57} 17779 | 1017 | runnable | {50,56} 17779 | 1018 | blocked | {50,57} 17779 | 1019 | runnable | {50,56} 17779 | 1020 | blocked | {50,57} (8 rows) ``` --- src/backend/distributed/utils/background_jobs.c | 2 +- .../regress/expected/background_rebalance_parallel.out | 10 ++++++++-- src/test/regress/sql/background_rebalance_parallel.sql | 4 +++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 789732d21..84ef4229f 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -395,7 +395,7 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus) /* sleep for a while, before rechecking the task status */ CHECK_FOR_INTERRUPTS(); - const long delay_ms = 1000; + const long delay_ms = 100; (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, delay_ms, diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index 187f709e4..dbdd963a9 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -513,6 +513,12 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; (8 rows) -- increase citus.max_background_task_executors_per_node +SELECT citus_task_wait(1013, desired_status => 'done'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; SELECT pg_reload_conf(); pg_reload_conf @@ -520,13 +526,13 @@ SELECT pg_reload_conf(); t (1 row) -SELECT citus_task_wait(1015, desired_status => 'running'); +SELECT citus_task_wait(1014, desired_status => 'running'); citus_task_wait --------------------------------------------------------------------- (1 row) -SELECT citus_task_wait(1013, desired_status => 'done'); +SELECT citus_task_wait(1015, desired_status => 'running'); citus_task_wait --------------------------------------------------------------------- diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql index e55fd93bb..2eb952b67 100644 --- a/src/test/regress/sql/background_rebalance_parallel.sql +++ b/src/test/regress/sql/background_rebalance_parallel.sql @@ -221,10 +221,12 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; -- increase citus.max_background_task_executors_per_node +SELECT citus_task_wait(1013, desired_status => 'done'); ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2; SELECT pg_reload_conf(); + +SELECT citus_task_wait(1014, desired_status => 'running'); SELECT citus_task_wait(1015, desired_status => 'running'); -SELECT citus_task_wait(1013, desired_status => 'done'); -- show that at most 2 tasks per node are running -- among the tasks that are not blocked From 73c771d6edf307412ec7e55ee409ada5dc2a345a Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Fri, 5 May 2023 19:08:35 +0300 Subject: [PATCH 6/6] Update readme for 11.3 (#6903) Co-authored-by: Hanefi Onaldi Co-authored-by: Jelte Fennema --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7a15cd6d8..6b6dc5664 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -| **
The Citus database is 100% open source.

Learn what's new in the [Citus 11.2 release blog](https://www.citusdata.com/blog/2023/02/08/whats-new-in-citus-11-2-patroni-ha-support/) and the [Citus Updates page](https://www.citusdata.com/updates/).

**| +| **
The Citus database is 100% open source.

Learn what's new in the [Citus 11.3 release blog](https://www.citusdata.com/blog/2023/05/05/whats-new-in-citus-11-3-multi-tenant-saas/) and the [Citus Updates page](https://www.citusdata.com/updates/).

**| |---|
@@ -94,14 +94,14 @@ Install packages on Ubuntu / Debian: ```bash curl https://install.citusdata.com/community/deb.sh > add-citus-repo.sh sudo bash add-citus-repo.sh -sudo apt-get -y install postgresql-15-citus-11.2 +sudo apt-get -y install postgresql-15-citus-11.3 ``` Install packages on CentOS / Red Hat: ```bash curl https://install.citusdata.com/community/rpm.sh > add-citus-repo.sh sudo bash add-citus-repo.sh -sudo yum install -y citus112_15 +sudo yum install -y citus113_15 ``` To add Citus to your local PostgreSQL database, add the following to `postgresql.conf`: @@ -349,7 +349,7 @@ To learn more about columnar storage, check out the [columnar storage README](ht ## Setting up with High Availability -One of the most popular high availability solutions for PostgreSQL, [Patroni 3.0](https://github.com/zalando/patroni), has [first class support for Citus 10.0 and above](https://patroni.readthedocs.io/en/latest/citus.html#citus), additionally Citus 11.2 ships with improvements for smoother node switchover in Patroni. +One of the most popular high availability solutions for PostgreSQL, [Patroni 3.0](https://github.com/zalando/patroni), has [first class support for Citus 10.0 and above](https://patroni.readthedocs.io/en/latest/citus.html#citus), additionally since Citus 11.2 ships with improvements for smoother node switchover in Patroni. An example of patronictl list output for the Citus cluster: