mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
commit
7649b9dfff
|
@ -6,7 +6,7 @@
|
|||
* Adds support for `MERGE` command on co-located distributed tables joined on
|
||||
distribution column (#6696, #6733)
|
||||
|
||||
* Adds the view `citus_stats_tenants` that monitor statistics on tenant usages
|
||||
* Adds the view `citus_stat_tenants` that monitor statistics on tenant usages
|
||||
(#6725)
|
||||
|
||||
* Adds the GUC `citus.max_background_task_executors_per_node` to control number
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
| **<br/>The Citus database is 100% open source.<br/><img width=1000/><br/>Learn what's new in the [Citus 11.2 release blog](https://www.citusdata.com/blog/2023/02/08/whats-new-in-citus-11-2-patroni-ha-support/) and the [Citus Updates page](https://www.citusdata.com/updates/).<br/><br/>**|
|
||||
| **<br/>The Citus database is 100% open source.<br/><img width=1000/><br/>Learn what's new in the [Citus 11.3 release blog](https://www.citusdata.com/blog/2023/05/05/whats-new-in-citus-11-3-multi-tenant-saas/) and the [Citus Updates page](https://www.citusdata.com/updates/).<br/><br/>**|
|
||||
|---|
|
||||
<br/>
|
||||
|
||||
|
@ -94,14 +94,14 @@ Install packages on Ubuntu / Debian:
|
|||
```bash
|
||||
curl https://install.citusdata.com/community/deb.sh > add-citus-repo.sh
|
||||
sudo bash add-citus-repo.sh
|
||||
sudo apt-get -y install postgresql-15-citus-11.2
|
||||
sudo apt-get -y install postgresql-15-citus-11.3
|
||||
```
|
||||
|
||||
Install packages on CentOS / Red Hat:
|
||||
```bash
|
||||
curl https://install.citusdata.com/community/rpm.sh > add-citus-repo.sh
|
||||
sudo bash add-citus-repo.sh
|
||||
sudo yum install -y citus112_15
|
||||
sudo yum install -y citus113_15
|
||||
```
|
||||
|
||||
To add Citus to your local PostgreSQL database, add the following to `postgresql.conf`:
|
||||
|
@ -349,7 +349,7 @@ To learn more about columnar storage, check out the [columnar storage README](ht
|
|||
|
||||
## Setting up with High Availability
|
||||
|
||||
One of the most popular high availability solutions for PostgreSQL, [Patroni 3.0](https://github.com/zalando/patroni), has [first class support for Citus 10.0 and above](https://patroni.readthedocs.io/en/latest/citus.html#citus), additionally Citus 11.2 ships with improvements for smoother node switchover in Patroni.
|
||||
One of the most popular high availability solutions for PostgreSQL, [Patroni 3.0](https://github.com/zalando/patroni), has [first class support for Citus 10.0 and above](https://patroni.readthedocs.io/en/latest/citus.html#citus), additionally since Citus 11.2 ships with improvements for smoother node switchover in Patroni.
|
||||
|
||||
An example of patronictl list output for the Citus cluster:
|
||||
|
||||
|
|
|
@ -5246,42 +5246,20 @@ get_rule_expr(Node *node, deparse_context *context,
|
|||
case T_RelabelType:
|
||||
{
|
||||
RelabelType *relabel = (RelabelType *) node;
|
||||
Node *arg = (Node *) relabel->arg;
|
||||
|
||||
/*
|
||||
* This is a Citus specific modification
|
||||
* The planner converts CollateExpr to RelabelType
|
||||
* and here we convert back.
|
||||
*/
|
||||
if (relabel->resultcollid != InvalidOid)
|
||||
if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
|
||||
!showimplicit)
|
||||
{
|
||||
CollateExpr *collate = RelabelTypeToCollateExpr(relabel);
|
||||
Node *arg = (Node *) collate->arg;
|
||||
|
||||
if (!PRETTY_PAREN(context))
|
||||
appendStringInfoChar(buf, '(');
|
||||
get_rule_expr_paren(arg, context, showimplicit, node);
|
||||
appendStringInfo(buf, " COLLATE %s",
|
||||
generate_collation_name(collate->collOid));
|
||||
if (!PRETTY_PAREN(context))
|
||||
appendStringInfoChar(buf, ')');
|
||||
/* don't show the implicit cast */
|
||||
get_rule_expr_paren(arg, context, false, node);
|
||||
}
|
||||
else
|
||||
{
|
||||
Node *arg = (Node *) relabel->arg;
|
||||
|
||||
if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
|
||||
!showimplicit)
|
||||
{
|
||||
/* don't show the implicit cast */
|
||||
get_rule_expr_paren(arg, context, false, node);
|
||||
}
|
||||
else
|
||||
{
|
||||
get_coercion_expr(arg, context,
|
||||
relabel->resulttype,
|
||||
relabel->resulttypmod,
|
||||
node);
|
||||
}
|
||||
get_coercion_expr(arg, context,
|
||||
relabel->resulttype,
|
||||
relabel->resulttypmod,
|
||||
node);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -5470,42 +5470,20 @@ get_rule_expr(Node *node, deparse_context *context,
|
|||
case T_RelabelType:
|
||||
{
|
||||
RelabelType *relabel = (RelabelType *) node;
|
||||
Node *arg = (Node *) relabel->arg;
|
||||
|
||||
/*
|
||||
* This is a Citus specific modification
|
||||
* The planner converts CollateExpr to RelabelType
|
||||
* and here we convert back.
|
||||
*/
|
||||
if (relabel->resultcollid != InvalidOid)
|
||||
if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
|
||||
!showimplicit)
|
||||
{
|
||||
CollateExpr *collate = RelabelTypeToCollateExpr(relabel);
|
||||
Node *arg = (Node *) collate->arg;
|
||||
|
||||
if (!PRETTY_PAREN(context))
|
||||
appendStringInfoChar(buf, '(');
|
||||
get_rule_expr_paren(arg, context, showimplicit, node);
|
||||
appendStringInfo(buf, " COLLATE %s",
|
||||
generate_collation_name(collate->collOid));
|
||||
if (!PRETTY_PAREN(context))
|
||||
appendStringInfoChar(buf, ')');
|
||||
/* don't show the implicit cast */
|
||||
get_rule_expr_paren(arg, context, false, node);
|
||||
}
|
||||
else
|
||||
{
|
||||
Node *arg = (Node *) relabel->arg;
|
||||
|
||||
if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
|
||||
!showimplicit)
|
||||
{
|
||||
/* don't show the implicit cast */
|
||||
get_rule_expr_paren(arg, context, false, node);
|
||||
}
|
||||
else
|
||||
{
|
||||
get_coercion_expr(arg, context,
|
||||
relabel->resulttype,
|
||||
relabel->resulttypmod,
|
||||
node);
|
||||
}
|
||||
get_coercion_expr(arg, context,
|
||||
relabel->resulttype,
|
||||
relabel->resulttypmod,
|
||||
node);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -5696,42 +5696,20 @@ get_rule_expr(Node *node, deparse_context *context,
|
|||
case T_RelabelType:
|
||||
{
|
||||
RelabelType *relabel = (RelabelType *) node;
|
||||
Node *arg = (Node *) relabel->arg;
|
||||
|
||||
/*
|
||||
* This is a Citus specific modification
|
||||
* The planner converts CollateExpr to RelabelType
|
||||
* and here we convert back.
|
||||
*/
|
||||
if (relabel->resultcollid != InvalidOid)
|
||||
if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
|
||||
!showimplicit)
|
||||
{
|
||||
CollateExpr *collate = RelabelTypeToCollateExpr(relabel);
|
||||
Node *arg = (Node *) collate->arg;
|
||||
|
||||
if (!PRETTY_PAREN(context))
|
||||
appendStringInfoChar(buf, '(');
|
||||
get_rule_expr_paren(arg, context, showimplicit, node);
|
||||
appendStringInfo(buf, " COLLATE %s",
|
||||
generate_collation_name(collate->collOid));
|
||||
if (!PRETTY_PAREN(context))
|
||||
appendStringInfoChar(buf, ')');
|
||||
/* don't show the implicit cast */
|
||||
get_rule_expr_paren(arg, context, false, node);
|
||||
}
|
||||
else
|
||||
{
|
||||
Node *arg = (Node *) relabel->arg;
|
||||
|
||||
if (relabel->relabelformat == COERCE_IMPLICIT_CAST &&
|
||||
!showimplicit)
|
||||
{
|
||||
/* don't show the implicit cast */
|
||||
get_rule_expr_paren(arg, context, false, node);
|
||||
}
|
||||
else
|
||||
{
|
||||
get_coercion_expr(arg, context,
|
||||
relabel->resulttype,
|
||||
relabel->resulttypmod,
|
||||
node);
|
||||
}
|
||||
get_coercion_expr(arg, context,
|
||||
relabel->resulttype,
|
||||
relabel->resulttypmod,
|
||||
node);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -49,179 +49,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
|
|||
Node *quals,
|
||||
List *targetList,
|
||||
CmdType commandType);
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* CreateMergePlan attempts to create a plan for the given MERGE SQL
|
||||
* statement. If planning fails ->planningError is set to a description
|
||||
* of the failure.
|
||||
*/
|
||||
DistributedPlan *
|
||||
CreateMergePlan(Query *originalQuery, Query *query,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||
bool multiShardQuery = false;
|
||||
Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
|
||||
|
||||
Assert(originalQuery->commandType == CMD_MERGE);
|
||||
Assert(OidIsValid(targetRelationId));
|
||||
|
||||
distributedPlan->targetRelationId = targetRelationId;
|
||||
distributedPlan->modLevel = RowModifyLevelForQuery(query);
|
||||
distributedPlan->planningError = MergeQuerySupported(targetRelationId,
|
||||
originalQuery,
|
||||
multiShardQuery,
|
||||
plannerRestrictionContext);
|
||||
|
||||
if (distributedPlan->planningError != NULL)
|
||||
{
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
||||
&distributedPlan->planningError);
|
||||
|
||||
if (distributedPlan->planningError != NULL)
|
||||
{
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
|
||||
|
||||
distributedPlan->workerJob = job;
|
||||
distributedPlan->combineQuery = NULL;
|
||||
|
||||
/* MERGE doesn't support RETURNING clause */
|
||||
distributedPlan->expectResults = false;
|
||||
distributedPlan->fastPathRouterPlan =
|
||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
|
||||
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MergeQuerySupported does check for a MERGE command in the query, if it finds
|
||||
* one, it will verify the below criteria
|
||||
* - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
|
||||
* - Distributed tables requirements in ErrorIfDistTablesNotColocated
|
||||
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
|
||||
*/
|
||||
DeferredErrorMessage *
|
||||
MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
/* function is void for pre-15 versions of Postgres */
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
|
||||
return NULL;
|
||||
|
||||
#else
|
||||
|
||||
/*
|
||||
* TODO: For now, we are adding an exception where any volatile or stable
|
||||
* functions are not allowed in the MERGE query, but this will become too
|
||||
* restrictive as this will prevent many useful and simple cases, such as,
|
||||
* INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without
|
||||
* this restriction, we have a potential danger of some of the function(s)
|
||||
* getting executed at the worker which will result in incorrect behavior.
|
||||
*/
|
||||
if (contain_mutable_functions((Node *) originalQuery))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"non-IMMUTABLE functions are not yet supported "
|
||||
"in MERGE sql with distributed tables ",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
|
||||
|
||||
/*
|
||||
* Fast path queries cannot have merge command, and we prevent the remaining here.
|
||||
* In Citus we have limited support for MERGE, it's allowed only if all
|
||||
* the tables(target, source or any CTE) tables are are local i.e. a
|
||||
* combination of Citus local and Non-Citus tables (regular Postgres tables)
|
||||
* or distributed tables with some restrictions, please see header of routine
|
||||
* ErrorIfDistTablesNotColocated for details.
|
||||
*/
|
||||
DeferredErrorMessage *deferredError =
|
||||
ErrorIfMergeHasUnsupportedTables(resultRelationId,
|
||||
originalQuery,
|
||||
rangeTableList,
|
||||
plannerRestrictionContext);
|
||||
if (deferredError)
|
||||
{
|
||||
/* MERGE's unsupported combination, raise the exception */
|
||||
RaiseDeferredError(deferredError, ERROR);
|
||||
}
|
||||
|
||||
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
|
||||
originalQuery->jointree,
|
||||
originalQuery->jointree->
|
||||
quals,
|
||||
originalQuery->targetList,
|
||||
originalQuery->commandType);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
|
||||
/*
|
||||
* MERGE is a special case where we have multiple modify statements
|
||||
* within itself. Check each INSERT/UPDATE/DELETE individually.
|
||||
*/
|
||||
MergeAction *action = NULL;
|
||||
foreach_ptr(action, originalQuery->mergeActionList)
|
||||
{
|
||||
Assert(originalQuery->returningList == NULL);
|
||||
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
|
||||
originalQuery->jointree,
|
||||
action->qual,
|
||||
action->targetList,
|
||||
action->commandType);
|
||||
if (deferredError)
|
||||
{
|
||||
/* MERGE's unsupported scenario, raise the exception */
|
||||
RaiseDeferredError(deferredError, ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
deferredError =
|
||||
InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
|
||||
if (deferredError)
|
||||
{
|
||||
/* MERGE's unsupported scenario, raise the exception */
|
||||
RaiseDeferredError(deferredError, ERROR);
|
||||
}
|
||||
|
||||
if (multiShardQuery)
|
||||
{
|
||||
deferredError =
|
||||
DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
|
||||
plannerRestrictionContext);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
}
|
||||
|
||||
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"a join with USING causes an internal naming "
|
||||
"conflict, use ON instead", NULL, NULL);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
|
||||
/*
|
||||
* ErrorIfDistTablesNotColocated Checks to see if
|
||||
*
|
||||
|
@ -728,6 +557,174 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre
|
|||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* MergeQuerySupported does check for a MERGE command in the query, if it finds
|
||||
* one, it will verify the below criteria
|
||||
* - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
|
||||
* - Distributed tables requirements in ErrorIfDistTablesNotColocated
|
||||
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
|
||||
*/
|
||||
DeferredErrorMessage *
|
||||
MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
/* function is void for pre-15 versions of Postgres */
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
|
||||
return NULL;
|
||||
|
||||
#else
|
||||
|
||||
/*
|
||||
* TODO: For now, we are adding an exception where any volatile or stable
|
||||
* functions are not allowed in the MERGE query, but this will become too
|
||||
* restrictive as this will prevent many useful and simple cases, such as,
|
||||
* INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without
|
||||
* this restriction, we have a potential danger of some of the function(s)
|
||||
* getting executed at the worker which will result in incorrect behavior.
|
||||
*/
|
||||
if (contain_mutable_functions((Node *) originalQuery))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"non-IMMUTABLE functions are not yet supported "
|
||||
"in MERGE sql with distributed tables ",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
|
||||
|
||||
/*
|
||||
* Fast path queries cannot have merge command, and we prevent the remaining here.
|
||||
* In Citus we have limited support for MERGE, it's allowed only if all
|
||||
* the tables(target, source or any CTE) tables are are local i.e. a
|
||||
* combination of Citus local and Non-Citus tables (regular Postgres tables)
|
||||
* or distributed tables with some restrictions, please see header of routine
|
||||
* ErrorIfDistTablesNotColocated for details.
|
||||
*/
|
||||
DeferredErrorMessage *deferredError =
|
||||
ErrorIfMergeHasUnsupportedTables(resultRelationId,
|
||||
originalQuery,
|
||||
rangeTableList,
|
||||
plannerRestrictionContext);
|
||||
if (deferredError)
|
||||
{
|
||||
/* MERGE's unsupported combination, raise the exception */
|
||||
RaiseDeferredError(deferredError, ERROR);
|
||||
}
|
||||
|
||||
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
|
||||
originalQuery->jointree,
|
||||
originalQuery->jointree->
|
||||
quals,
|
||||
originalQuery->targetList,
|
||||
originalQuery->commandType);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
|
||||
/*
|
||||
* MERGE is a special case where we have multiple modify statements
|
||||
* within itself. Check each INSERT/UPDATE/DELETE individually.
|
||||
*/
|
||||
MergeAction *action = NULL;
|
||||
foreach_ptr(action, originalQuery->mergeActionList)
|
||||
{
|
||||
Assert(originalQuery->returningList == NULL);
|
||||
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
|
||||
originalQuery->jointree,
|
||||
action->qual,
|
||||
action->targetList,
|
||||
action->commandType);
|
||||
if (deferredError)
|
||||
{
|
||||
/* MERGE's unsupported scenario, raise the exception */
|
||||
RaiseDeferredError(deferredError, ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
deferredError =
|
||||
InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
|
||||
if (deferredError)
|
||||
{
|
||||
/* MERGE's unsupported scenario, raise the exception */
|
||||
RaiseDeferredError(deferredError, ERROR);
|
||||
}
|
||||
|
||||
if (multiShardQuery)
|
||||
{
|
||||
deferredError =
|
||||
DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
|
||||
plannerRestrictionContext);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
}
|
||||
|
||||
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"a join with USING causes an internal naming "
|
||||
"conflict, use ON instead", NULL, NULL);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateMergePlan attempts to create a plan for the given MERGE SQL
|
||||
* statement. If planning fails ->planningError is set to a description
|
||||
* of the failure.
|
||||
*/
|
||||
DistributedPlan *
|
||||
CreateMergePlan(Query *originalQuery, Query *query,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||
bool multiShardQuery = false;
|
||||
Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
|
||||
|
||||
Assert(originalQuery->commandType == CMD_MERGE);
|
||||
Assert(OidIsValid(targetRelationId));
|
||||
|
||||
distributedPlan->targetRelationId = targetRelationId;
|
||||
distributedPlan->modLevel = RowModifyLevelForQuery(query);
|
||||
distributedPlan->planningError = MergeQuerySupported(targetRelationId,
|
||||
originalQuery,
|
||||
multiShardQuery,
|
||||
plannerRestrictionContext);
|
||||
|
||||
if (distributedPlan->planningError != NULL)
|
||||
{
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
||||
&distributedPlan->planningError);
|
||||
|
||||
if (distributedPlan->planningError != NULL)
|
||||
{
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
|
||||
|
||||
distributedPlan->workerJob = job;
|
||||
distributedPlan->combineQuery = NULL;
|
||||
|
||||
/* MERGE doesn't support RETURNING clause */
|
||||
distributedPlan->expectResults = false;
|
||||
distributedPlan->fastPathRouterPlan =
|
||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
|
||||
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsLocalTableModification returns true if the table modified is a Postgres table.
|
||||
* We do not support recursive planning for MERGE yet, so we could have a join
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "access/xlog.h"
|
||||
#include "catalog/pg_aggregate.h"
|
||||
#include "catalog/pg_am.h"
|
||||
#include "catalog/pg_collation.h"
|
||||
#include "catalog/pg_operator.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/defrem.h"
|
||||
|
@ -69,6 +70,7 @@
|
|||
#include "optimizer/restrictinfo.h"
|
||||
#include "optimizer/tlist.h"
|
||||
#include "parser/parse_relation.h"
|
||||
#include "parser/parse_type.h"
|
||||
#include "parser/parsetree.h"
|
||||
#include "rewrite/rewriteManip.h"
|
||||
#include "utils/builtins.h"
|
||||
|
@ -79,6 +81,7 @@
|
|||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/typcache.h"
|
||||
|
||||
/* RepartitionJoinBucketCountPerNode determines bucket amount during repartitions */
|
||||
|
@ -231,6 +234,11 @@ static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr);
|
|||
static List * FetchEqualityAttrNumsForList(List *nodeList);
|
||||
static int PartitionColumnIndex(Var *targetVar, List *targetList);
|
||||
static List * GetColumnOriginalIndexes(Oid relationId);
|
||||
static bool QueryTreeHasImproperForDeparseNodes(Node *inputNode);
|
||||
static Node * AdjustImproperForDeparseNodes(Node *inputNode);
|
||||
static bool IsImproperForDeparseRelabelTypeNode(Node *inputNode);
|
||||
static bool IsImproperForDeparseCoerceViaIONode(Node *inputNode);
|
||||
static CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -2683,6 +2691,18 @@ SqlTaskList(Job *job)
|
|||
List *fragmentCombinationList = FragmentCombinationList(rangeTableFragmentsList,
|
||||
jobQuery, dependentJobList);
|
||||
|
||||
/*
|
||||
* Adjust RelabelType and CoerceViaIO nodes that are improper for deparsing.
|
||||
* We first check if there are any such nodes by using a query tree walker.
|
||||
* The reason is that a query tree mutator will create a deep copy of all
|
||||
* the query sublinks, and we don't want to do that unless necessary, as it
|
||||
* would be inefficient.
|
||||
*/
|
||||
if (QueryTreeHasImproperForDeparseNodes((Node *) jobQuery))
|
||||
{
|
||||
jobQuery = (Query *) AdjustImproperForDeparseNodes((Node *) jobQuery);
|
||||
}
|
||||
|
||||
ListCell *fragmentCombinationCell = NULL;
|
||||
foreach(fragmentCombinationCell, fragmentCombinationList)
|
||||
{
|
||||
|
@ -2733,7 +2753,7 @@ SqlTaskList(Job *job)
|
|||
* RelabelTypeToCollateExpr converts RelabelType's into CollationExpr's.
|
||||
* With that, we will be able to pushdown COLLATE's.
|
||||
*/
|
||||
CollateExpr *
|
||||
static CollateExpr *
|
||||
RelabelTypeToCollateExpr(RelabelType *relabelType)
|
||||
{
|
||||
Assert(OidIsValid(relabelType->resultcollid));
|
||||
|
@ -5593,3 +5613,126 @@ TaskListHighestTaskId(List *taskList)
|
|||
|
||||
return highestTaskId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* QueryTreeHasImproperForDeparseNodes walks over the node,
|
||||
* and returns true if there are RelabelType or
|
||||
* CoerceViaIONodes which are improper for deparse
|
||||
*/
|
||||
static bool
|
||||
QueryTreeHasImproperForDeparseNodes(Node *inputNode)
|
||||
{
|
||||
if (inputNode == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else if (IsImproperForDeparseRelabelTypeNode(inputNode) ||
|
||||
IsImproperForDeparseCoerceViaIONode(inputNode))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else if (IsA(inputNode, Query))
|
||||
{
|
||||
return query_tree_walker((Query *) inputNode,
|
||||
QueryTreeHasImproperForDeparseNodes,
|
||||
NULL, 0);
|
||||
}
|
||||
|
||||
return expression_tree_walker(inputNode,
|
||||
QueryTreeHasImproperForDeparseNodes,
|
||||
NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AdjustImproperForDeparseNodes takes an input rewritten query and modifies
|
||||
* nodes which, after going through our planner, pose a problem when
|
||||
* deparsing. So far we have two such type of Nodes that may pose problems:
|
||||
* RelabelType and CoerceIO nodes.
|
||||
* Details will be written in comments in the corresponding if conditions.
|
||||
*/
|
||||
static Node *
|
||||
AdjustImproperForDeparseNodes(Node *inputNode)
|
||||
{
|
||||
if (inputNode == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (IsImproperForDeparseRelabelTypeNode(inputNode))
|
||||
{
|
||||
/*
|
||||
* The planner converts CollateExpr to RelabelType
|
||||
* and here we convert back.
|
||||
*/
|
||||
return (Node *) RelabelTypeToCollateExpr((RelabelType *) inputNode);
|
||||
}
|
||||
else if (IsImproperForDeparseCoerceViaIONode(inputNode))
|
||||
{
|
||||
/*
|
||||
* The planner converts some ::text/::varchar casts to ::cstring
|
||||
* and here we convert back to text because cstring is a pseudotype
|
||||
* and it cannot be casted to most resulttypes
|
||||
*/
|
||||
|
||||
CoerceViaIO *iocoerce = (CoerceViaIO *) inputNode;
|
||||
Node *arg = (Node *) iocoerce->arg;
|
||||
Const *cstringToText = (Const *) arg;
|
||||
|
||||
cstringToText->consttype = TEXTOID;
|
||||
cstringToText->constlen = -1;
|
||||
|
||||
Type textType = typeidType(TEXTOID);
|
||||
char *constvalue = NULL;
|
||||
|
||||
if (!cstringToText->constisnull)
|
||||
{
|
||||
constvalue = DatumGetCString(cstringToText->constvalue);
|
||||
}
|
||||
|
||||
cstringToText->constvalue = stringTypeDatum(textType,
|
||||
constvalue,
|
||||
cstringToText->consttypmod);
|
||||
ReleaseSysCache(textType);
|
||||
return inputNode;
|
||||
}
|
||||
else if (IsA(inputNode, Query))
|
||||
{
|
||||
return (Node *) query_tree_mutator((Query *) inputNode,
|
||||
AdjustImproperForDeparseNodes,
|
||||
NULL, QTW_DONT_COPY_QUERY);
|
||||
}
|
||||
|
||||
return expression_tree_mutator(inputNode, AdjustImproperForDeparseNodes, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Checks if the given node is of Relabel type which is improper for deparsing
|
||||
* The planner converts some CollateExpr to RelabelType nodes, and we need
|
||||
* to find these nodes. They would be improperly deparsed without the
|
||||
* "COLLATE" expression.
|
||||
*/
|
||||
static bool
|
||||
IsImproperForDeparseRelabelTypeNode(Node *inputNode)
|
||||
{
|
||||
return (IsA(inputNode, RelabelType) &&
|
||||
OidIsValid(((RelabelType *) inputNode)->resultcollid) &&
|
||||
((RelabelType *) inputNode)->resultcollid != DEFAULT_COLLATION_OID);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Checks if the given node is of CoerceViaIO type which is improper for deparsing
|
||||
* The planner converts some ::text/::varchar casts to ::cstring, and we need
|
||||
* to find these nodes. They would be improperly deparsed with "cstring" which cannot
|
||||
* be casted to most resulttypes.
|
||||
*/
|
||||
static bool
|
||||
IsImproperForDeparseCoerceViaIONode(Node *inputNode)
|
||||
{
|
||||
return (IsA(inputNode, CoerceViaIO) &&
|
||||
IsA(((CoerceViaIO *) inputNode)->arg, Const) &&
|
||||
((Const *) ((CoerceViaIO *) inputNode)->arg)->consttype == CSTRINGOID);
|
||||
}
|
||||
|
|
|
@ -395,7 +395,7 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
|
|||
|
||||
/* sleep for a while, before rechecking the task status */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
const long delay_ms = 1000;
|
||||
const long delay_ms = 100;
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
||||
delay_ms,
|
||||
|
|
|
@ -543,7 +543,6 @@ extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
|
|||
List *groupClauseList,
|
||||
List *targetList,
|
||||
bool checkExpressionEquality);
|
||||
extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
|
||||
|
||||
/*
|
||||
* Function declarations for building, updating constraints and simple operator
|
||||
|
|
|
@ -466,17 +466,27 @@ SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
|
|||
-- see dependent tasks to understand which tasks remain runnable because of
|
||||
-- citus.max_background_task_executors_per_node
|
||||
-- and which tasks are actually blocked from colocation group dependencies
|
||||
SELECT D.task_id,
|
||||
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
|
||||
D.depends_on,
|
||||
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
|
||||
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
|
||||
task_id | command | depends_on | command
|
||||
SELECT (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
|
||||
(SELECT T.command depends_on_command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
|
||||
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY 1, 2 ASC;
|
||||
command | depends_on_command
|
||||
---------------------------------------------------------------------
|
||||
1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | 1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
|
||||
1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | 1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
|
||||
1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | 1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
|
||||
1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | 1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
|
||||
SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
|
||||
SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
|
||||
SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
|
||||
SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
|
||||
(4 rows)
|
||||
|
||||
SELECT task_id, depends_on
|
||||
FROM pg_dist_background_task_depend
|
||||
WHERE job_id in (:job_id)
|
||||
ORDER BY 1, 2 ASC;
|
||||
task_id | depends_on
|
||||
---------------------------------------------------------------------
|
||||
1014 | 1013
|
||||
1016 | 1015
|
||||
1018 | 1017
|
||||
1020 | 1019
|
||||
(4 rows)
|
||||
|
||||
-- default citus.max_background_task_executors_per_node is 1
|
||||
|
@ -503,6 +513,12 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
|
|||
(8 rows)
|
||||
|
||||
-- increase citus.max_background_task_executors_per_node
|
||||
SELECT citus_task_wait(1013, desired_status => 'done');
|
||||
citus_task_wait
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
|
@ -510,13 +526,13 @@ SELECT pg_reload_conf();
|
|||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_task_wait(1015, desired_status => 'running');
|
||||
SELECT citus_task_wait(1014, desired_status => 'running');
|
||||
citus_task_wait
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_task_wait(1013, desired_status => 'done');
|
||||
SELECT citus_task_wait(1015, desired_status => 'running');
|
||||
citus_task_wait
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -75,9 +75,9 @@ NOTICE: renaming the new table to collation_tests.test_collate_pushed_down_aggr
|
|||
SET citus.log_remote_commands TO true;
|
||||
SELECT ALL MIN((lower(CAST(test_collate_pushed_down_aggregate.a AS VARCHAR)) COLLATE "C"))
|
||||
FROM ONLY test_collate_pushed_down_aggregate;
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying)::text) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060004 test_collate_pushed_down_aggregate WHERE true
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying COLLATE "default")) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
|
||||
NOTICE: issuing SELECT min((lower(((a)::character varying)::text) COLLATE "C")) AS min FROM ONLY collation_tests.test_collate_pushed_down_aggregate_20060005 test_collate_pushed_down_aggregate WHERE true
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
min
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -236,7 +236,7 @@ MERGE INTO target t
|
|||
last_order_id = s.order_id
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING
|
||||
NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center)::text OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT * from target t WHERE t.customer_id = 30002;
|
||||
|
@ -1073,7 +1073,7 @@ UPDATE SET value = vl_source.value, id = vl_target.id + 1
|
|||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(vl_source.ID, vl_source.value);
|
||||
DEBUG: Creating MERGE router plan
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.vl_target_xxxxxxx vl_target USING (SELECT vl.id, vl.value FROM (VALUES (100,'source1'::text), (200,'source2'::text)) vl(id, value)) vl_source ON (vl_source.id OPERATOR(pg_catalog.=) vl_target.id) WHEN MATCHED THEN UPDATE SET id = (vl_target.id OPERATOR(pg_catalog.+) 1), value = (vl_source.value COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, value) VALUES (vl_source.id, (vl_source.value COLLATE "default"))>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.vl_target_xxxxxxx vl_target USING (SELECT vl.id, vl.value FROM (VALUES (100,'source1'::text), (200,'source2'::text)) vl(id, value)) vl_source ON (vl_source.id OPERATOR(pg_catalog.=) vl_target.id) WHEN MATCHED THEN UPDATE SET id = (vl_target.id OPERATOR(pg_catalog.+) 1), value = vl_source.value WHEN NOT MATCHED THEN INSERT (id, value) VALUES (vl_source.id, vl_source.value)>
|
||||
RESET client_min_messages;
|
||||
SELECT * INTO vl_local FROM vl_target ORDER BY 1 ;
|
||||
-- Should be equal
|
||||
|
@ -1331,7 +1331,7 @@ MERGE INTO ft_target
|
|||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val);
|
||||
DEBUG: Creating MERGE router plan
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.ft_target USING merge_schema.foreign_table_xxxxxxx foreign_table ON (foreign_table.id OPERATOR(pg_catalog.=) ft_target.id) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, (foreign_table.user_val COLLATE "default"))>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.ft_target USING merge_schema.foreign_table_xxxxxxx foreign_table ON (foreign_table.id OPERATOR(pg_catalog.=) ft_target.id) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val)>
|
||||
RESET client_min_messages;
|
||||
SELECT * FROM ft_target;
|
||||
id | user_val
|
||||
|
@ -1667,13 +1667,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -1710,13 +1710,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -1753,13 +1753,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -1836,13 +1836,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -1871,7 +1871,7 @@ WHEN NOT MATCHED THEN
|
|||
DO NOTHING;
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by Merge'::text) WHEN NOT MATCHED THEN DO NOTHING
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -1917,13 +1917,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by CTE'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -1958,13 +1958,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
|||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = ((t.val)::text OPERATOR(pg_catalog.||) 'Updated by subquery'::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
|
@ -2042,13 +2042,13 @@ SELECT count(*) FROM citus_target; -- before merge
|
|||
|
||||
SET citus.log_remote_commands to true;
|
||||
EXECUTE citus_prep(500);
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SELECT * FROM citus_target WHERE id = 500; -- non-cached
|
||||
NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500)
|
||||
|
@ -2059,49 +2059,49 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
|||
(1 row)
|
||||
|
||||
EXECUTE citus_prep(500);
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
EXECUTE citus_prep(500);
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
EXECUTE citus_prep(500);
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
EXECUTE citus_prep(500);
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
EXECUTE citus_prep(500);
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = ('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val)::text) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val)
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT * FROM citus_target WHERE id = 500; -- cached
|
||||
|
|
|
@ -50,3 +50,307 @@ SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
|
|||
8
|
||||
(1 row)
|
||||
|
||||
-- Standard planner converted text and varchar casts to cstring in some cases
|
||||
-- We make sure we convert it back to text when parsing the expression
|
||||
INSERT INTO test VALUES ('2022-02-02', 0);
|
||||
INSERT INTO test VALUES ('2022-01-01', 1);
|
||||
INSERT INTO test VALUES ('2021-01-01', 2);
|
||||
-- try different planners
|
||||
PREPARE test_statement_regular(text) AS
|
||||
SELECT user_id FROM test WHERE t >= $1::timestamp ORDER BY user_id;
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
PREPARE test_statement_router(int, text) AS
|
||||
SELECT user_id FROM test WHERE user_id = $1 AND t >= $2::timestamp ORDER BY user_id;
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
PREPARE test_statement_repartition(int, text) AS
|
||||
SELECT count(*) FROM test t1 JOIN test t2 USING (t) WHERE t1.user_id = $1 AND t >= $2::timestamp;
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
PREPARE test_statement_cte(text, text) AS
|
||||
WITH cte_1 AS MATERIALIZED (SELECT user_id, t FROM test WHERE t >= $1::timestamp ORDER BY user_id)
|
||||
SELECT user_id FROM cte_1 WHERE t <= $2::timestamp;
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
PREPARE test_statement_insert(int, text) AS
|
||||
INSERT INTO test VALUES ($2::timestamp, $1);
|
||||
EXECUTE test_statement_insert(3, '2022-03-03');
|
||||
EXECUTE test_statement_insert(4, '2022-04-04');
|
||||
EXECUTE test_statement_insert(5, '2022-05-05');
|
||||
EXECUTE test_statement_insert(6, '2022-06-06');
|
||||
EXECUTE test_statement_insert(7, '2022-07-07');
|
||||
EXECUTE test_statement_insert(8, '2022-08-08');
|
||||
EXECUTE test_statement_insert(9, '2022-09-09');
|
||||
EXECUTE test_statement_insert(10, '2022-10-10');
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
11
|
||||
(1 row)
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
user_id
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
10
|
||||
(10 rows)
|
||||
|
||||
PREPARE test_statement_null(text) AS
|
||||
SELECT user_id , $1::timestamp FROM test ORDER BY user_id LIMIT 2;
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
user_id | timestamp
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
1 |
|
||||
(2 rows)
|
||||
|
||||
|
|
|
@ -91,3 +91,15 @@ SELECT create_distributed_table('http_request', 'site_id');
|
|||
|
||||
(1 row)
|
||||
|
||||
-- Standard planner converted text and varchar casts to cstring in some cases
|
||||
-- We make sure we convert it back to text when parsing the expression
|
||||
-- https://github.com/citusdata/citus/issues/6061
|
||||
-- https://github.com/citusdata/citus/issues/5646
|
||||
-- https://github.com/citusdata/citus/issues/5033
|
||||
CREATE TABLE test(t timestamp, user_id int);
|
||||
SELECT create_distributed_table('test', 'user_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -204,11 +204,14 @@ SELECT citus_rebalance_start AS job_id from citus_rebalance_start() \gset
|
|||
-- see dependent tasks to understand which tasks remain runnable because of
|
||||
-- citus.max_background_task_executors_per_node
|
||||
-- and which tasks are actually blocked from colocation group dependencies
|
||||
SELECT D.task_id,
|
||||
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
|
||||
D.depends_on,
|
||||
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
|
||||
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
|
||||
SELECT (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
|
||||
(SELECT T.command depends_on_command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
|
||||
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY 1, 2 ASC;
|
||||
|
||||
SELECT task_id, depends_on
|
||||
FROM pg_dist_background_task_depend
|
||||
WHERE job_id in (:job_id)
|
||||
ORDER BY 1, 2 ASC;
|
||||
|
||||
-- default citus.max_background_task_executors_per_node is 1
|
||||
-- show that first exactly one task per node is running
|
||||
|
@ -218,10 +221,12 @@ SELECT job_id, task_id, status, nodes_involved
|
|||
FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
|
||||
|
||||
-- increase citus.max_background_task_executors_per_node
|
||||
SELECT citus_task_wait(1013, desired_status => 'done');
|
||||
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 2;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
SELECT citus_task_wait(1014, desired_status => 'running');
|
||||
SELECT citus_task_wait(1015, desired_status => 'running');
|
||||
SELECT citus_task_wait(1013, desired_status => 'done');
|
||||
|
||||
-- show that at most 2 tasks per node are running
|
||||
-- among the tasks that are not blocked
|
||||
|
|
|
@ -43,3 +43,85 @@ EXECUTE foo;
|
|||
|
||||
SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
|
||||
|
||||
-- Standard planner converted text and varchar casts to cstring in some cases
|
||||
-- We make sure we convert it back to text when parsing the expression
|
||||
INSERT INTO test VALUES ('2022-02-02', 0);
|
||||
INSERT INTO test VALUES ('2022-01-01', 1);
|
||||
INSERT INTO test VALUES ('2021-01-01', 2);
|
||||
|
||||
-- try different planners
|
||||
PREPARE test_statement_regular(text) AS
|
||||
SELECT user_id FROM test WHERE t >= $1::timestamp ORDER BY user_id;
|
||||
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
|
||||
PREPARE test_statement_router(int, text) AS
|
||||
SELECT user_id FROM test WHERE user_id = $1 AND t >= $2::timestamp ORDER BY user_id;
|
||||
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
EXECUTE test_statement_router(1, '2022-01-01');
|
||||
|
||||
PREPARE test_statement_repartition(int, text) AS
|
||||
SELECT count(*) FROM test t1 JOIN test t2 USING (t) WHERE t1.user_id = $1 AND t >= $2::timestamp;
|
||||
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
EXECUTE test_statement_repartition(1, '2022-01-01');
|
||||
|
||||
PREPARE test_statement_cte(text, text) AS
|
||||
WITH cte_1 AS MATERIALIZED (SELECT user_id, t FROM test WHERE t >= $1::timestamp ORDER BY user_id)
|
||||
SELECT user_id FROM cte_1 WHERE t <= $2::timestamp;
|
||||
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
EXECUTE test_statement_cte('2022-01-01', '2022-01-01');
|
||||
|
||||
PREPARE test_statement_insert(int, text) AS
|
||||
INSERT INTO test VALUES ($2::timestamp, $1);
|
||||
|
||||
EXECUTE test_statement_insert(3, '2022-03-03');
|
||||
EXECUTE test_statement_insert(4, '2022-04-04');
|
||||
EXECUTE test_statement_insert(5, '2022-05-05');
|
||||
EXECUTE test_statement_insert(6, '2022-06-06');
|
||||
EXECUTE test_statement_insert(7, '2022-07-07');
|
||||
EXECUTE test_statement_insert(8, '2022-08-08');
|
||||
EXECUTE test_statement_insert(9, '2022-09-09');
|
||||
EXECUTE test_statement_insert(10, '2022-10-10');
|
||||
|
||||
SELECT count(*) FROM test;
|
||||
EXECUTE test_statement_regular('2022-01-01');
|
||||
|
||||
PREPARE test_statement_null(text) AS
|
||||
SELECT user_id , $1::timestamp FROM test ORDER BY user_id LIMIT 2;
|
||||
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
EXECUTE test_statement_null(NULL);
|
||||
|
|
|
@ -73,3 +73,12 @@ CREATE TABLE http_request (
|
|||
);
|
||||
|
||||
SELECT create_distributed_table('http_request', 'site_id');
|
||||
|
||||
-- Standard planner converted text and varchar casts to cstring in some cases
|
||||
-- We make sure we convert it back to text when parsing the expression
|
||||
-- https://github.com/citusdata/citus/issues/6061
|
||||
-- https://github.com/citusdata/citus/issues/5646
|
||||
-- https://github.com/citusdata/citus/issues/5033
|
||||
|
||||
CREATE TABLE test(t timestamp, user_id int);
|
||||
SELECT create_distributed_table('test', 'user_id');
|
||||
|
|
Loading…
Reference in New Issue