diff --git a/CHANGELOG.md b/CHANGELOG.md
index c72e64cbf..de9bfeeb8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,7 +6,7 @@
* Adds support for `MERGE` command on co-located distributed tables joined on
distribution column (#6696, #6733)
-* Adds the view `citus_stats_tenants` that monitor statistics on tenant usages
+* Adds the view `citus_stat_tenants` that monitor statistics on tenant usages
(#6725)
* Adds the GUC `citus.max_background_task_executors_per_node` to control number
diff --git a/README.md b/README.md
index 7a15cd6d8..6b6dc5664 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-| **
The Citus database is 100% open source.
![]()
Learn what's new in the [Citus 11.2 release blog](https://www.citusdata.com/blog/2023/02/08/whats-new-in-citus-11-2-patroni-ha-support/) and the [Citus Updates page](https://www.citusdata.com/updates/).
**|
+| **
The Citus database is 100% open source.
![]()
Learn what's new in the [Citus 11.3 release blog](https://www.citusdata.com/blog/2023/05/05/whats-new-in-citus-11-3-multi-tenant-saas/) and the [Citus Updates page](https://www.citusdata.com/updates/).
**|
|---|
@@ -94,14 +94,14 @@ Install packages on Ubuntu / Debian:
```bash
curl https://install.citusdata.com/community/deb.sh > add-citus-repo.sh
sudo bash add-citus-repo.sh
-sudo apt-get -y install postgresql-15-citus-11.2
+sudo apt-get -y install postgresql-15-citus-11.3
```
Install packages on CentOS / Red Hat:
```bash
curl https://install.citusdata.com/community/rpm.sh > add-citus-repo.sh
sudo bash add-citus-repo.sh
-sudo yum install -y citus112_15
+sudo yum install -y citus113_15
```
To add Citus to your local PostgreSQL database, add the following to `postgresql.conf`:
@@ -349,7 +349,7 @@ To learn more about columnar storage, check out the [columnar storage README](ht
## Setting up with High Availability
-One of the most popular high availability solutions for PostgreSQL, [Patroni 3.0](https://github.com/zalando/patroni), has [first class support for Citus 10.0 and above](https://patroni.readthedocs.io/en/latest/citus.html#citus), additionally Citus 11.2 ships with improvements for smoother node switchover in Patroni.
+One of the most popular high availability solutions for PostgreSQL, [Patroni 3.0](https://github.com/zalando/patroni), has [first class support for Citus 10.0 and above](https://patroni.readthedocs.io/en/latest/citus.html#citus), additionally since Citus 11.2 ships with improvements for smoother node switchover in Patroni.
An example of patronictl list output for the Citus cluster:
diff --git a/src/backend/distributed/deparser/ruleutils_13.c b/src/backend/distributed/deparser/ruleutils_13.c
index 86eeb19a1..31ef67f97 100644
--- a/src/backend/distributed/deparser/ruleutils_13.c
+++ b/src/backend/distributed/deparser/ruleutils_13.c
@@ -5246,42 +5246,20 @@ get_rule_expr(Node *node, deparse_context *context,
case T_RelabelType:
{
RelabelType *relabel = (RelabelType *) node;
+ Node *arg = (Node *) relabel->arg;
- /*
- * This is a Citus specific modification
- * The planner converts CollateExpr to RelabelType
- * and here we convert back.
- */
- if (relabel->resultcollid != InvalidOid)
+ if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
+ !showimplicit)
{
- CollateExpr *collate = RelabelTypeToCollateExpr(relabel);
- Node *arg = (Node *) collate->arg;
-
- if (!PRETTY_PAREN(context))
- appendStringInfoChar(buf, '(');
- get_rule_expr_paren(arg, context, showimplicit, node);
- appendStringInfo(buf, " COLLATE %s",
- generate_collation_name(collate->collOid));
- if (!PRETTY_PAREN(context))
- appendStringInfoChar(buf, ')');
+ /* don't show the implicit cast */
+ get_rule_expr_paren(arg, context, false, node);
}
else
{
- Node *arg = (Node *) relabel->arg;
-
- if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
- !showimplicit)
- {
- /* don't show the implicit cast */
- get_rule_expr_paren(arg, context, false, node);
- }
- else
- {
- get_coercion_expr(arg, context,
- relabel->resulttype,
- relabel->resulttypmod,
- node);
- }
+ get_coercion_expr(arg, context,
+ relabel->resulttype,
+ relabel->resulttypmod,
+ node);
}
}
break;
diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c
index 79fae9ac0..b364221d8 100644
--- a/src/backend/distributed/deparser/ruleutils_14.c
+++ b/src/backend/distributed/deparser/ruleutils_14.c
@@ -5470,42 +5470,20 @@ get_rule_expr(Node *node, deparse_context *context,
case T_RelabelType:
{
RelabelType *relabel = (RelabelType *) node;
+ Node *arg = (Node *) relabel->arg;
- /*
- * This is a Citus specific modification
- * The planner converts CollateExpr to RelabelType
- * and here we convert back.
- */
- if (relabel->resultcollid != InvalidOid)
+ if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
+ !showimplicit)
{
- CollateExpr *collate = RelabelTypeToCollateExpr(relabel);
- Node *arg = (Node *) collate->arg;
-
- if (!PRETTY_PAREN(context))
- appendStringInfoChar(buf, '(');
- get_rule_expr_paren(arg, context, showimplicit, node);
- appendStringInfo(buf, " COLLATE %s",
- generate_collation_name(collate->collOid));
- if (!PRETTY_PAREN(context))
- appendStringInfoChar(buf, ')');
+ /* don't show the implicit cast */
+ get_rule_expr_paren(arg, context, false, node);
}
else
{
- Node *arg = (Node *) relabel->arg;
-
- if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
- !showimplicit)
- {
- /* don't show the implicit cast */
- get_rule_expr_paren(arg, context, false, node);
- }
- else
- {
- get_coercion_expr(arg, context,
- relabel->resulttype,
- relabel->resulttypmod,
- node);
- }
+ get_coercion_expr(arg, context,
+ relabel->resulttype,
+ relabel->resulttypmod,
+ node);
}
}
break;
diff --git a/src/backend/distributed/deparser/ruleutils_15.c b/src/backend/distributed/deparser/ruleutils_15.c
index 827492d87..2dded9b01 100644
--- a/src/backend/distributed/deparser/ruleutils_15.c
+++ b/src/backend/distributed/deparser/ruleutils_15.c
@@ -5696,42 +5696,20 @@ get_rule_expr(Node *node, deparse_context *context,
case T_RelabelType:
{
RelabelType *relabel = (RelabelType *) node;
+ Node *arg = (Node *) relabel->arg;
- /*
- * This is a Citus specific modification
- * The planner converts CollateExpr to RelabelType
- * and here we convert back.
- */
- if (relabel->resultcollid != InvalidOid)
+ if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
+ !showimplicit)
{
- CollateExpr *collate = RelabelTypeToCollateExpr(relabel);
- Node *arg = (Node *) collate->arg;
-
- if (!PRETTY_PAREN(context))
- appendStringInfoChar(buf, '(');
- get_rule_expr_paren(arg, context, showimplicit, node);
- appendStringInfo(buf, " COLLATE %s",
- generate_collation_name(collate->collOid));
- if (!PRETTY_PAREN(context))
- appendStringInfoChar(buf, ')');
+ /* don't show the implicit cast */
+ get_rule_expr_paren(arg, context, false, node);
}
else
{
- Node *arg = (Node *) relabel->arg;
-
- if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
- !showimplicit)
- {
- /* don't show the implicit cast */
- get_rule_expr_paren(arg, context, false, node);
- }
- else
- {
- get_coercion_expr(arg, context,
- relabel->resulttype,
- relabel->resulttypmod,
- node);
- }
+ get_coercion_expr(arg, context,
+ relabel->resulttype,
+ relabel->resulttypmod,
+ node);
}
}
break;
diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c
index 930a44db8..86163e131 100644
--- a/src/backend/distributed/planner/merge_planner.c
+++ b/src/backend/distributed/planner/merge_planner.c
@@ -49,179 +49,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
Node *quals,
List *targetList,
CmdType commandType);
-#endif
-/*
- * CreateMergePlan attempts to create a plan for the given MERGE SQL
- * statement. If planning fails ->planningError is set to a description
- * of the failure.
- */
-DistributedPlan *
-CreateMergePlan(Query *originalQuery, Query *query,
- PlannerRestrictionContext *plannerRestrictionContext)
-{
- DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
- bool multiShardQuery = false;
- Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
-
- Assert(originalQuery->commandType == CMD_MERGE);
- Assert(OidIsValid(targetRelationId));
-
- distributedPlan->targetRelationId = targetRelationId;
- distributedPlan->modLevel = RowModifyLevelForQuery(query);
- distributedPlan->planningError = MergeQuerySupported(targetRelationId,
- originalQuery,
- multiShardQuery,
- plannerRestrictionContext);
-
- if (distributedPlan->planningError != NULL)
- {
- return distributedPlan;
- }
-
- Job *job = RouterJob(originalQuery, plannerRestrictionContext,
- &distributedPlan->planningError);
-
- if (distributedPlan->planningError != NULL)
- {
- return distributedPlan;
- }
-
- ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
-
- distributedPlan->workerJob = job;
- distributedPlan->combineQuery = NULL;
-
- /* MERGE doesn't support RETURNING clause */
- distributedPlan->expectResults = false;
- distributedPlan->fastPathRouterPlan =
- plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
-
- return distributedPlan;
-}
-
-
-/*
- * MergeQuerySupported does check for a MERGE command in the query, if it finds
- * one, it will verify the below criteria
- * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
- * - Distributed tables requirements in ErrorIfDistTablesNotColocated
- * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
- */
-DeferredErrorMessage *
-MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
- PlannerRestrictionContext *plannerRestrictionContext)
-{
- /* function is void for pre-15 versions of Postgres */
- #if PG_VERSION_NUM < PG_VERSION_15
-
- return NULL;
-
- #else
-
- /*
- * TODO: For now, we are adding an exception where any volatile or stable
- * functions are not allowed in the MERGE query, but this will become too
- * restrictive as this will prevent many useful and simple cases, such as,
- * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without
- * this restriction, we have a potential danger of some of the function(s)
- * getting executed at the worker which will result in incorrect behavior.
- */
- if (contain_mutable_functions((Node *) originalQuery))
- {
- return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
- "non-IMMUTABLE functions are not yet supported "
- "in MERGE sql with distributed tables ",
- NULL, NULL);
- }
-
- List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
-
- /*
- * Fast path queries cannot have merge command, and we prevent the remaining here.
- * In Citus we have limited support for MERGE, it's allowed only if all
- * the tables(target, source or any CTE) tables are are local i.e. a
- * combination of Citus local and Non-Citus tables (regular Postgres tables)
- * or distributed tables with some restrictions, please see header of routine
- * ErrorIfDistTablesNotColocated for details.
- */
- DeferredErrorMessage *deferredError =
- ErrorIfMergeHasUnsupportedTables(resultRelationId,
- originalQuery,
- rangeTableList,
- plannerRestrictionContext);
- if (deferredError)
- {
- /* MERGE's unsupported combination, raise the exception */
- RaiseDeferredError(deferredError, ERROR);
- }
-
- deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
- originalQuery->jointree,
- originalQuery->jointree->
- quals,
- originalQuery->targetList,
- originalQuery->commandType);
- if (deferredError)
- {
- return deferredError;
- }
-
- /*
- * MERGE is a special case where we have multiple modify statements
- * within itself. Check each INSERT/UPDATE/DELETE individually.
- */
- MergeAction *action = NULL;
- foreach_ptr(action, originalQuery->mergeActionList)
- {
- Assert(originalQuery->returningList == NULL);
- deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
- originalQuery->jointree,
- action->qual,
- action->targetList,
- action->commandType);
- if (deferredError)
- {
- /* MERGE's unsupported scenario, raise the exception */
- RaiseDeferredError(deferredError, ERROR);
- }
- }
-
- deferredError =
- InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
- if (deferredError)
- {
- /* MERGE's unsupported scenario, raise the exception */
- RaiseDeferredError(deferredError, ERROR);
- }
-
- if (multiShardQuery)
- {
- deferredError =
- DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
- plannerRestrictionContext);
- if (deferredError)
- {
- return deferredError;
- }
- }
-
- if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
- {
- return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
- "a join with USING causes an internal naming "
- "conflict, use ON instead", NULL, NULL);
- }
-
- return NULL;
-
- #endif
-}
-
-
-#if PG_VERSION_NUM >= PG_VERSION_15
-
/*
* ErrorIfDistTablesNotColocated Checks to see if
*
@@ -728,6 +557,174 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre
#endif
+/*
+ * MergeQuerySupported does check for a MERGE command in the query, if it finds
+ * one, it will verify the below criteria
+ * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
+ * - Distributed tables requirements in ErrorIfDistTablesNotColocated
+ * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
+ */
+DeferredErrorMessage *
+MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
+ PlannerRestrictionContext *plannerRestrictionContext)
+{
+ /* function is void for pre-15 versions of Postgres */
+ #if PG_VERSION_NUM < PG_VERSION_15
+
+ return NULL;
+
+ #else
+
+ /*
+ * TODO: For now, we are adding an exception where any volatile or stable
+ * functions are not allowed in the MERGE query, but this will become too
+ * restrictive as this will prevent many useful and simple cases, such as,
+ * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without
+ * this restriction, we have a potential danger of some of the function(s)
+ * getting executed at the worker which will result in incorrect behavior.
+ */
+ if (contain_mutable_functions((Node *) originalQuery))
+ {
+ return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
+ "non-IMMUTABLE functions are not yet supported "
+ "in MERGE sql with distributed tables ",
+ NULL, NULL);
+ }
+
+ List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
+
+ /*
+ * Fast path queries cannot have merge command, and we prevent the remaining here.
+ * In Citus we have limited support for MERGE, it's allowed only if all
+ * the tables(target, source or any CTE) tables are are local i.e. a
+ * combination of Citus local and Non-Citus tables (regular Postgres tables)
+ * or distributed tables with some restrictions, please see header of routine
+ * ErrorIfDistTablesNotColocated for details.
+ */
+ DeferredErrorMessage *deferredError =
+ ErrorIfMergeHasUnsupportedTables(resultRelationId,
+ originalQuery,
+ rangeTableList,
+ plannerRestrictionContext);
+ if (deferredError)
+ {
+ /* MERGE's unsupported combination, raise the exception */
+ RaiseDeferredError(deferredError, ERROR);
+ }
+
+ deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
+ originalQuery->jointree,
+ originalQuery->jointree->
+ quals,
+ originalQuery->targetList,
+ originalQuery->commandType);
+ if (deferredError)
+ {
+ return deferredError;
+ }
+
+ /*
+ * MERGE is a special case where we have multiple modify statements
+ * within itself. Check each INSERT/UPDATE/DELETE individually.
+ */
+ MergeAction *action = NULL;
+ foreach_ptr(action, originalQuery->mergeActionList)
+ {
+ Assert(originalQuery->returningList == NULL);
+ deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
+ originalQuery->jointree,
+ action->qual,
+ action->targetList,
+ action->commandType);
+ if (deferredError)
+ {
+ /* MERGE's unsupported scenario, raise the exception */
+ RaiseDeferredError(deferredError, ERROR);
+ }
+ }
+
+ deferredError =
+ InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
+ if (deferredError)
+ {
+ /* MERGE's unsupported scenario, raise the exception */
+ RaiseDeferredError(deferredError, ERROR);
+ }
+
+ if (multiShardQuery)
+ {
+ deferredError =
+ DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
+ plannerRestrictionContext);
+ if (deferredError)
+ {
+ return deferredError;
+ }
+ }
+
+ if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
+ {
+ return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
+ "a join with USING causes an internal naming "
+ "conflict, use ON instead", NULL, NULL);
+ }
+
+ return NULL;
+
+ #endif
+}
+
+
+/*
+ * CreateMergePlan attempts to create a plan for the given MERGE SQL
+ * statement. If planning fails ->planningError is set to a description
+ * of the failure.
+ */
+DistributedPlan *
+CreateMergePlan(Query *originalQuery, Query *query,
+ PlannerRestrictionContext *plannerRestrictionContext)
+{
+ DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
+ bool multiShardQuery = false;
+ Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
+
+ Assert(originalQuery->commandType == CMD_MERGE);
+ Assert(OidIsValid(targetRelationId));
+
+ distributedPlan->targetRelationId = targetRelationId;
+ distributedPlan->modLevel = RowModifyLevelForQuery(query);
+ distributedPlan->planningError = MergeQuerySupported(targetRelationId,
+ originalQuery,
+ multiShardQuery,
+ plannerRestrictionContext);
+
+ if (distributedPlan->planningError != NULL)
+ {
+ return distributedPlan;
+ }
+
+ Job *job = RouterJob(originalQuery, plannerRestrictionContext,
+ &distributedPlan->planningError);
+
+ if (distributedPlan->planningError != NULL)
+ {
+ return distributedPlan;
+ }
+
+ ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
+
+ distributedPlan->workerJob = job;
+ distributedPlan->combineQuery = NULL;
+
+ /* MERGE doesn't support RETURNING clause */
+ distributedPlan->expectResults = false;
+ distributedPlan->fastPathRouterPlan =
+ plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
+
+ return distributedPlan;
+}
+
+
/*
* IsLocalTableModification returns true if the table modified is a Postgres table.
* We do not support recursive planning for MERGE yet, so we could have a join
diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c
index b210da7d7..24fbd9935 100644
--- a/src/backend/distributed/planner/multi_physical_planner.c
+++ b/src/backend/distributed/planner/multi_physical_planner.c
@@ -28,6 +28,7 @@
#include "access/xlog.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_am.h"
+#include "catalog/pg_collation.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
@@ -69,6 +70,7 @@
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
#include "parser/parse_relation.h"
+#include "parser/parse_type.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteManip.h"
#include "utils/builtins.h"
@@ -79,6 +81,7 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
+#include "utils/syscache.h"
#include "utils/typcache.h"
/* RepartitionJoinBucketCountPerNode determines bucket amount during repartitions */
@@ -231,6 +234,11 @@ static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr);
static List * FetchEqualityAttrNumsForList(List *nodeList);
static int PartitionColumnIndex(Var *targetVar, List *targetList);
static List * GetColumnOriginalIndexes(Oid relationId);
+static bool QueryTreeHasImproperForDeparseNodes(Node *inputNode);
+static Node * AdjustImproperForDeparseNodes(Node *inputNode);
+static bool IsImproperForDeparseRelabelTypeNode(Node *inputNode);
+static bool IsImproperForDeparseCoerceViaIONode(Node *inputNode);
+static CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
/*
@@ -2683,6 +2691,18 @@ SqlTaskList(Job *job)
List *fragmentCombinationList = FragmentCombinationList(rangeTableFragmentsList,
jobQuery, dependentJobList);
+ /*
+ * Adjust RelabelType and CoerceViaIO nodes that are improper for deparsing.
+ * We first check if there are any such nodes by using a query tree walker.
+ * The reason is that a query tree mutator will create a deep copy of all
+ * the query sublinks, and we don't want to do that unless necessary, as it
+ * would be inefficient.
+ */
+ if (QueryTreeHasImproperForDeparseNodes((Node *) jobQuery))
+ {
+ jobQuery = (Query *) AdjustImproperForDeparseNodes((Node *) jobQuery);
+ }
+
ListCell *fragmentCombinationCell = NULL;
foreach(fragmentCombinationCell, fragmentCombinationList)
{
@@ -2733,7 +2753,7 @@ SqlTaskList(Job *job)
* RelabelTypeToCollateExpr converts RelabelType's into CollationExpr's.
* With that, we will be able to pushdown COLLATE's.
*/
-CollateExpr *
+static CollateExpr *
RelabelTypeToCollateExpr(RelabelType *relabelType)
{
Assert(OidIsValid(relabelType->resultcollid));
@@ -5593,3 +5613,126 @@ TaskListHighestTaskId(List *taskList)
return highestTaskId;
}
+
+
+/*
+ * QueryTreeHasImproperForDeparseNodes walks over the node,
+ * and returns true if there are RelabelType or
+ * CoerceViaIONodes which are improper for deparse
+ */
+static bool
+QueryTreeHasImproperForDeparseNodes(Node *inputNode)
+{
+ if (inputNode == NULL)
+ {
+ return false;
+ }
+ else if (IsImproperForDeparseRelabelTypeNode(inputNode) ||
+ IsImproperForDeparseCoerceViaIONode(inputNode))
+ {
+ return true;
+ }
+ else if (IsA(inputNode, Query))
+ {
+ return query_tree_walker((Query *) inputNode,
+ QueryTreeHasImproperForDeparseNodes,
+ NULL, 0);
+ }
+
+ return expression_tree_walker(inputNode,
+ QueryTreeHasImproperForDeparseNodes,
+ NULL);
+}
+
+
+/*
+ * AdjustImproperForDeparseNodes takes an input rewritten query and modifies
+ * nodes which, after going through our planner, pose a problem when
+ * deparsing. So far we have two such type of Nodes that may pose problems:
+ * RelabelType and CoerceIO nodes.
+ * Details will be written in comments in the corresponding if conditions.
+ */
+static Node *
+AdjustImproperForDeparseNodes(Node *inputNode)
+{
+ if (inputNode == NULL)
+ {
+ return NULL;
+ }
+
+ if (IsImproperForDeparseRelabelTypeNode(inputNode))
+ {
+ /*
+ * The planner converts CollateExpr to RelabelType
+ * and here we convert back.
+ */
+ return (Node *) RelabelTypeToCollateExpr((RelabelType *) inputNode);
+ }
+ else if (IsImproperForDeparseCoerceViaIONode(inputNode))
+ {
+ /*
+ * The planner converts some ::text/::varchar casts to ::cstring
+ * and here we convert back to text because cstring is a pseudotype
+ * and it cannot be casted to most resulttypes
+ */
+
+ CoerceViaIO *iocoerce = (CoerceViaIO *) inputNode;
+ Node *arg = (Node *) iocoerce->arg;
+ Const *cstringToText = (Const *) arg;
+
+ cstringToText->consttype = TEXTOID;
+ cstringToText->constlen = -1;
+
+ Type textType = typeidType(TEXTOID);
+ char *constvalue = NULL;
+
+ if (!cstringToText->constisnull)
+ {
+ constvalue = DatumGetCString(cstringToText->constvalue);
+ }
+
+ cstringToText->constvalue = stringTypeDatum(textType,
+ constvalue,
+ cstringToText->consttypmod);
+ ReleaseSysCache(textType);
+ return inputNode;
+ }
+ else if (IsA(inputNode, Query))
+ {
+ return (Node *) query_tree_mutator((Query *) inputNode,
+ AdjustImproperForDeparseNodes,
+ NULL, QTW_DONT_COPY_QUERY);
+ }
+
+ return expression_tree_mutator(inputNode, AdjustImproperForDeparseNodes, NULL);
+}
+
+
+/*
+ * Checks if the given node is of Relabel type which is improper for deparsing
+ * The planner converts some CollateExpr to RelabelType nodes, and we need
+ * to find these nodes. They would be improperly deparsed without the
+ * "COLLATE" expression.
+ */
+static bool
+IsImproperForDeparseRelabelTypeNode(Node *inputNode)
+{
+ return (IsA(inputNode, RelabelType) &&
+ OidIsValid(((RelabelType *) inputNode)->resultcollid) &&
+ ((RelabelType *) inputNode)->resultcollid != DEFAULT_COLLATION_OID);
+}
+
+
+/*
+ * Checks if the given node is of CoerceViaIO type which is improper for deparsing
+ * The planner converts some ::text/::varchar casts to ::cstring, and we need
+ * to find these nodes. They would be improperly deparsed with "cstring" which cannot
+ * be casted to most resulttypes.
+ */
+static bool
+IsImproperForDeparseCoerceViaIONode(Node *inputNode)
+{
+ return (IsA(inputNode, CoerceViaIO) &&
+ IsA(((CoerceViaIO *) inputNode)->arg, Const) &&
+ ((Const *) ((CoerceViaIO *) inputNode)->arg)->consttype == CSTRINGOID);
+}
diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c
index 789732d21..84ef4229f 100644
--- a/src/backend/distributed/utils/background_jobs.c
+++ b/src/backend/distributed/utils/background_jobs.c
@@ -395,7 +395,7 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
/* sleep for a while, before rechecking the task status */
CHECK_FOR_INTERRUPTS();
- const long delay_ms = 1000;
+ const long delay_ms = 100;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms,
diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h
index ea5d15c83..26d074053 100644
--- a/src/include/distributed/multi_physical_planner.h
+++ b/src/include/distributed/multi_physical_planner.h
@@ -543,7 +543,6 @@ extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
List *groupClauseList,
List *targetList,
bool checkExpressionEquality);
-extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
/*
* Function declarations for building, updating constraints and simple operator
diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out
index 9c43fab9b..dbdd963a9 100644
--- a/src/test/regress/expected/background_rebalance_parallel.out
+++ b/src/test/regress/expected/background_rebalance_parallel.out
@@ -466,17 +466,27 @@ SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
-- see dependent tasks to understand which tasks remain runnable because of
-- citus.max_background_task_executors_per_node
-- and which tasks are actually blocked from colocation group dependencies
-SELECT D.task_id,
- (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
- D.depends_on,
- (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
-FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
- task_id | command | depends_on | command
+SELECT (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
+ (SELECT T.command depends_on_command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
+FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY 1, 2 ASC;
+ command | depends_on_command
---------------------------------------------------------------------
- 1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
- 1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
- 1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
- 1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
+ SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
+ SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
+ SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
+ SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
+(4 rows)
+
+SELECT task_id, depends_on
+FROM pg_dist_background_task_depend
+WHERE job_id in (:job_id)
+ORDER BY 1, 2 ASC;
+ task_id | depends_on
+---------------------------------------------------------------------
+ 1014 | 1013
+ 1016 | 1015
+ 1018 | 1017
+ 1020 | 1019
(4 rows)
-- default citus.max_background_task_executors_per_node is 1
@@ -503,6 +513,12 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
(8 rows)
-- increase citus.max_background_task_executors_per_node
+SELECT citus_task_wait(1013, desired_status => 'done');
+ citus_task_wait
+---------------------------------------------------------------------
+
+(1 row)
+
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
pg_reload_conf
@@ -510,13 +526,13 @@ SELECT pg_reload_conf();
t
(1 row)
-SELECT citus_task_wait(1015, desired_status => 'running');
+SELECT citus_task_wait(1014, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
-SELECT citus_task_wait(1013, desired_status => 'done');
+SELECT citus_task_wait(1015, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
diff --git a/src/test/regress/expected/distributed_collations.out b/src/test/regress/expected/distributed_collations.out
index 0c03c8be7..5e7247b0a 100644
--- a/src/test/regress/expected/distributed_collations.out
+++ b/src/test/regress/expected/distributed_collations.out
@@ -75,9 +75,9 @@ NOTICE: renaming the new table to collation_tests.test_collate_pushed_down_aggr
SET citus.log_remote_commands TO true;
SELECT ALL MIN((lower(CAST(test_collate_pushed_down_aggregate.a AS VARCHAR)) COLLATE "C"))
FROM ONLY test_collate_pushed_down_aggregate;
-NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
+NOTICE: issuing SELECT min((lower(((a)::character varying)::text) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
+NOTICE: issuing SELECT min((lower(((a)::character varying)::text) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
min
---------------------------------------------------------------------
diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out
index 190e6b2b3..3cf776ded 100644
--- a/src/test/regress/expected/merge.out
+++ b/src/test/regress/expected/merge.out
@@ -236,7 +236,7 @@ MERGE INTO target t
last_order_id = s.order_id
WHEN NOT MATCHED THEN
DO NOTHING;
-NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING
+NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center)::text OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT * from target t WHERE t.customer_id = 30002;
@@ -1073,7 +1073,7 @@ UPDATE SET value = vl_source.value, id = vl_target.id + 1
WHEN NOT MATCHED THEN
INSERT VALUES(vl_source.ID, vl_source.value);
DEBUG: Creating MERGE router plan
-DEBUG:
+DEBUG:
RESET client_min_messages;
SELECT * INTO vl_local FROM vl_target ORDER BY 1 ;
-- Should be equal
@@ -1331,7 +1331,7 @@ MERGE INTO ft_target
WHEN NOT MATCHED THEN
INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val);
DEBUG: Creating MERGE router plan
-DEBUG:
+DEBUG:
RESET client_min_messages;
SELECT * FROM ft_target;
id | user_val
@@ -1667,13 +1667,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -1710,13 +1710,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -1753,13 +1753,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -1836,13 +1836,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -1871,7 +1871,7 @@ WHEN NOT MATCHED THEN
DO NOTHING;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -1917,13 +1917,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -1958,13 +1958,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
@@ -2042,13 +2042,13 @@ SELECT count(*) FROM citus_target; -- before merge
SET citus.log_remote_commands to true;
EXECUTE citus_prep(500);
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT * FROM citus_target WHERE id = 500; -- non-cached
NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500)
@@ -2059,49 +2059,49 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
(1 row)
EXECUTE citus_prep(500);
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
+NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT * FROM citus_target WHERE id = 500; -- cached
diff --git a/src/test/regress/expected/prepared_statements_4.out b/src/test/regress/expected/prepared_statements_4.out
index 0dba296e8..4b81ec260 100644
--- a/src/test/regress/expected/prepared_statements_4.out
+++ b/src/test/regress/expected/prepared_statements_4.out
@@ -50,3 +50,307 @@ SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
8
(1 row)
+-- Standard planner converted text and varchar casts to cstring in some cases
+-- We make sure we convert it back to text when parsing the expression
+INSERT INTO test VALUES ('2022-02-02', 0);
+INSERT INTO test VALUES ('2022-01-01', 1);
+INSERT INTO test VALUES ('2021-01-01', 2);
+-- try different planners
+PREPARE test_statement_regular(text) AS
+SELECT user_id FROM test WHERE t >= $1::timestamp ORDER BY user_id;
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+(2 rows)
+
+PREPARE test_statement_router(int, text) AS
+SELECT user_id FROM test WHERE user_id = $1 AND t >= $2::timestamp ORDER BY user_id;
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_router(1, '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+PREPARE test_statement_repartition(int, text) AS
+SELECT count(*) FROM test t1 JOIN test t2 USING (t) WHERE t1.user_id = $1 AND t >= $2::timestamp;
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+ count
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+PREPARE test_statement_cte(text, text) AS
+WITH cte_1 AS MATERIALIZED (SELECT user_id, t FROM test WHERE t >= $1::timestamp ORDER BY user_id)
+SELECT user_id FROM cte_1 WHERE t <= $2::timestamp;
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 1
+(1 row)
+
+PREPARE test_statement_insert(int, text) AS
+INSERT INTO test VALUES ($2::timestamp, $1);
+EXECUTE test_statement_insert(3, '2022-03-03');
+EXECUTE test_statement_insert(4, '2022-04-04');
+EXECUTE test_statement_insert(5, '2022-05-05');
+EXECUTE test_statement_insert(6, '2022-06-06');
+EXECUTE test_statement_insert(7, '2022-07-07');
+EXECUTE test_statement_insert(8, '2022-08-08');
+EXECUTE test_statement_insert(9, '2022-09-09');
+EXECUTE test_statement_insert(10, '2022-10-10');
+SELECT count(*) FROM test;
+ count
+---------------------------------------------------------------------
+ 11
+(1 row)
+
+EXECUTE test_statement_regular('2022-01-01');
+ user_id
+---------------------------------------------------------------------
+ 0
+ 1
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+PREPARE test_statement_null(text) AS
+SELECT user_id , $1::timestamp FROM test ORDER BY user_id LIMIT 2;
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
+EXECUTE test_statement_null(NULL);
+ user_id | timestamp
+---------------------------------------------------------------------
+ 0 |
+ 1 |
+(2 rows)
+
diff --git a/src/test/regress/expected/prepared_statements_create_load.out b/src/test/regress/expected/prepared_statements_create_load.out
index d1817f39f..5e3740abe 100644
--- a/src/test/regress/expected/prepared_statements_create_load.out
+++ b/src/test/regress/expected/prepared_statements_create_load.out
@@ -91,3 +91,15 @@ SELECT create_distributed_table('http_request', 'site_id');
(1 row)
+-- Standard planner converted text and varchar casts to cstring in some cases
+-- We make sure we convert it back to text when parsing the expression
+-- https://github.com/citusdata/citus/issues/6061
+-- https://github.com/citusdata/citus/issues/5646
+-- https://github.com/citusdata/citus/issues/5033
+CREATE TABLE test(t timestamp, user_id int);
+SELECT create_distributed_table('test', 'user_id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
diff --git a/src/test/regress/sql/background_rebalance_parallel.sql b/src/test/regress/sql/background_rebalance_parallel.sql
index 5229e7f88..2eb952b67 100644
--- a/src/test/regress/sql/background_rebalance_parallel.sql
+++ b/src/test/regress/sql/background_rebalance_parallel.sql
@@ -204,11 +204,14 @@ SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
-- see dependent tasks to understand which tasks remain runnable because of
-- citus.max_background_task_executors_per_node
-- and which tasks are actually blocked from colocation group dependencies
-SELECT D.task_id,
- (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
- D.depends_on,
- (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
-FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
+SELECT (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
+ (SELECT T.command depends_on_command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
+FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY 1, 2 ASC;
+
+SELECT task_id, depends_on
+FROM pg_dist_background_task_depend
+WHERE job_id in (:job_id)
+ORDER BY 1, 2 ASC;
-- default citus.max_background_task_executors_per_node is 1
-- show that first exactly one task per node is running
@@ -218,10 +221,12 @@ SELECT job_id, task_id, status, nodes_involved
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
-- increase citus.max_background_task_executors_per_node
+SELECT citus_task_wait(1013, desired_status => 'done');
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
SELECT pg_reload_conf();
+
+SELECT citus_task_wait(1014, desired_status => 'running');
SELECT citus_task_wait(1015, desired_status => 'running');
-SELECT citus_task_wait(1013, desired_status => 'done');
-- show that at most 2 tasks per node are running
-- among the tasks that are not blocked
diff --git a/src/test/regress/sql/prepared_statements_4.sql b/src/test/regress/sql/prepared_statements_4.sql
index a8f124568..bdbc32b08 100644
--- a/src/test/regress/sql/prepared_statements_4.sql
+++ b/src/test/regress/sql/prepared_statements_4.sql
@@ -43,3 +43,85 @@ EXECUTE foo;
SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
+-- Standard planner converted text and varchar casts to cstring in some cases
+-- We make sure we convert it back to text when parsing the expression
+INSERT INTO test VALUES ('2022-02-02', 0);
+INSERT INTO test VALUES ('2022-01-01', 1);
+INSERT INTO test VALUES ('2021-01-01', 2);
+
+-- try different planners
+PREPARE test_statement_regular(text) AS
+SELECT user_id FROM test WHERE t >= $1::timestamp ORDER BY user_id;
+
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+EXECUTE test_statement_regular('2022-01-01');
+
+PREPARE test_statement_router(int, text) AS
+SELECT user_id FROM test WHERE user_id = $1 AND t >= $2::timestamp ORDER BY user_id;
+
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+EXECUTE test_statement_router(1, '2022-01-01');
+
+PREPARE test_statement_repartition(int, text) AS
+SELECT count(*) FROM test t1 JOIN test t2 USING (t) WHERE t1.user_id = $1 AND t >= $2::timestamp;
+
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+EXECUTE test_statement_repartition(1, '2022-01-01');
+
+PREPARE test_statement_cte(text, text) AS
+WITH cte_1 AS MATERIALIZED (SELECT user_id, t FROM test WHERE t >= $1::timestamp ORDER BY user_id)
+SELECT user_id FROM cte_1 WHERE t <= $2::timestamp;
+
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
+
+PREPARE test_statement_insert(int, text) AS
+INSERT INTO test VALUES ($2::timestamp, $1);
+
+EXECUTE test_statement_insert(3, '2022-03-03');
+EXECUTE test_statement_insert(4, '2022-04-04');
+EXECUTE test_statement_insert(5, '2022-05-05');
+EXECUTE test_statement_insert(6, '2022-06-06');
+EXECUTE test_statement_insert(7, '2022-07-07');
+EXECUTE test_statement_insert(8, '2022-08-08');
+EXECUTE test_statement_insert(9, '2022-09-09');
+EXECUTE test_statement_insert(10, '2022-10-10');
+
+SELECT count(*) FROM test;
+EXECUTE test_statement_regular('2022-01-01');
+
+PREPARE test_statement_null(text) AS
+SELECT user_id , $1::timestamp FROM test ORDER BY user_id LIMIT 2;
+
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
+EXECUTE test_statement_null(NULL);
diff --git a/src/test/regress/sql/prepared_statements_create_load.sql b/src/test/regress/sql/prepared_statements_create_load.sql
index 6afb1a12f..025ff1f6a 100644
--- a/src/test/regress/sql/prepared_statements_create_load.sql
+++ b/src/test/regress/sql/prepared_statements_create_load.sql
@@ -73,3 +73,12 @@ CREATE TABLE http_request (
);
SELECT create_distributed_table('http_request', 'site_id');
+
+-- Standard planner converted text and varchar casts to cstring in some cases
+-- We make sure we convert it back to text when parsing the expression
+-- https://github.com/citusdata/citus/issues/6061
+-- https://github.com/citusdata/citus/issues/5646
+-- https://github.com/citusdata/citus/issues/5033
+
+CREATE TABLE test(t timestamp, user_id int);
+SELECT create_distributed_table('test', 'user_id');