mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into remove-stats-collector
commit
b65096b1d7
|
|
@ -1267,7 +1267,21 @@ StripesForRelfilelocator(RelFileLocator relfilelocator)
|
|||
{
|
||||
uint64 storageId = LookupStorageId(relfilelocator);
|
||||
|
||||
return ReadDataFileStripeList(storageId, GetTransactionSnapshot());
|
||||
/*
|
||||
* PG18 requires snapshot to be active or registered before it's used
|
||||
* Without this, we hit
|
||||
* Assert(snapshot->regd_count > 0 || snapshot->active_count > 0);
|
||||
* when reading columnar stripes.
|
||||
* Relevant PG18 commit:
|
||||
* 8076c00592e40e8dbd1fce7a98b20d4bf075e4ba
|
||||
*/
|
||||
Snapshot snapshot = RegisterSnapshot(GetTransactionSnapshot());
|
||||
|
||||
List *readDataFileStripeList = ReadDataFileStripeList(storageId, snapshot);
|
||||
|
||||
UnregisterSnapshot(snapshot);
|
||||
|
||||
return readDataFileStripeList;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -854,8 +854,11 @@ PostprocessIndexStmt(Node *node, const char *queryString)
|
|||
table_close(relation, NoLock);
|
||||
index_close(indexRelation, NoLock);
|
||||
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
/* mark index as invalid, in-place (cannot be rolled back) */
|
||||
index_set_state_flags(indexRelationId, INDEX_DROP_CLEAR_VALID);
|
||||
PopActiveSnapshot();
|
||||
|
||||
/* re-open a transaction command from here on out */
|
||||
CommitTransactionCommand();
|
||||
|
|
@ -1370,8 +1373,11 @@ MarkIndexValid(IndexStmt *indexStmt)
|
|||
schemaId);
|
||||
Relation indexRelation = index_open(indexRelationId, RowExclusiveLock);
|
||||
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
/* mark index as valid, in-place (cannot be rolled back) */
|
||||
index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
|
||||
PopActiveSnapshot();
|
||||
|
||||
table_close(relation, NoLock);
|
||||
index_close(indexRelation, NoLock);
|
||||
|
|
|
|||
|
|
@ -69,7 +69,15 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
|
|||
{
|
||||
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
|
||||
|
||||
RangeVar *relation = (RangeVar *) linitial(stmt->relations);
|
||||
Node *relationNode = (Node *) linitial(stmt->relations);
|
||||
|
||||
if (!IsA(relationNode, RangeVar))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
RangeVar *relation = (RangeVar *) relationNode;
|
||||
|
||||
Oid relationId = RangeVarGetRelid(relation, ShareUpdateExclusiveLock, false);
|
||||
|
||||
if (!IsCitusTable(relationId) || !ShouldPropagate())
|
||||
|
|
|
|||
|
|
@ -3804,6 +3804,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
|
|||
SubLink *cur_ma_sublink;
|
||||
List *ma_sublinks;
|
||||
|
||||
targetList = ExpandMergedSubscriptingRefEntries(targetList);
|
||||
|
||||
/*
|
||||
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
|
||||
* into a list. We expect them to appear, in ID order, in resjunk tlist
|
||||
|
|
@ -3827,6 +3829,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
ensure_update_targetlist_in_param_order(targetList);
|
||||
}
|
||||
next_ma_cell = list_head(ma_sublinks);
|
||||
cur_ma_sublink = NULL;
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@
|
|||
static int SourceResultPartitionColumnIndex(Query *mergeQuery,
|
||||
List *sourceTargetList,
|
||||
CitusTableCacheEntry *targetRelation);
|
||||
static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno);
|
||||
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
||||
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||
PlannerRestrictionContext *
|
||||
|
|
@ -422,6 +423,9 @@ ErrorIfMergeHasUnsupportedTables(Oid targetRelationId, List *rangeTableList)
|
|||
case RTE_VALUES:
|
||||
case RTE_JOIN:
|
||||
case RTE_CTE:
|
||||
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||
case RTE_GROUP:
|
||||
#endif
|
||||
{
|
||||
/* Skip them as base table(s) will be checked */
|
||||
continue;
|
||||
|
|
@ -628,6 +632,22 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* joinTree->quals, retrieved by GetMergeJoinTree() - either from
|
||||
* mergeJoinCondition (PG >= 17) or jointree->quals (PG < 17),
|
||||
* only contains the quals that present in "ON (..)" clause. Action
|
||||
* quals that can be specified for each specific action, as in
|
||||
* "WHEN <match condition> AND <action quals> THEN <action>"", are
|
||||
* saved into "qual" field of the corresponding action's entry in
|
||||
* mergeActionList, see
|
||||
* https://github.com/postgres/postgres/blob/e6da68a6e1d60a037b63a9c9ed36e5ef0a996769/src/backend/parser/parse_merge.c#L285-L293.
|
||||
*
|
||||
* For this reason, even if TargetEntryChangesValue() could prove that
|
||||
* an action's quals ensure that the action cannot change the distribution
|
||||
* key, this is not the case as we don't provide action quals to
|
||||
* TargetEntryChangesValue(), but just joinTree, which only contains
|
||||
* the "ON (..)" clause quals.
|
||||
*/
|
||||
if (targetEntryDistributionColumn &&
|
||||
TargetEntryChangesValue(targetEntry, distributionColumn, joinTree))
|
||||
{
|
||||
|
|
@ -1411,7 +1431,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
|||
Assert(sourceRepartitionVar);
|
||||
|
||||
int sourceResultRepartitionColumnIndex =
|
||||
DistributionColumnIndex(sourceTargetList, sourceRepartitionVar);
|
||||
FindTargetListEntryWithVarExprAttno(sourceTargetList,
|
||||
sourceRepartitionVar->varattno);
|
||||
|
||||
if (sourceResultRepartitionColumnIndex == -1)
|
||||
{
|
||||
|
|
@ -1562,6 +1583,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindTargetListEntryWithVarExprAttno finds the index of the target
|
||||
* entry whose expr is a Var that points to input varattno.
|
||||
*
|
||||
* If no such target entry is found, it returns -1.
|
||||
*/
|
||||
static int
|
||||
FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno)
|
||||
{
|
||||
int targetEntryIndex = 0;
|
||||
|
||||
TargetEntry *targetEntry = NULL;
|
||||
foreach_declared_ptr(targetEntry, targetList)
|
||||
{
|
||||
if (IsA(targetEntry->expr, Var) &&
|
||||
((Var *) targetEntry->expr)->varattno == varattno)
|
||||
{
|
||||
return targetEntryIndex;
|
||||
}
|
||||
|
||||
targetEntryIndex++;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsLocalTableModification returns true if the table modified is a Postgres table.
|
||||
* We do not support recursive planning for MERGE yet, so we could have a join
|
||||
|
|
|
|||
|
|
@ -3173,16 +3173,25 @@ BuildBaseConstraint(Var *column)
|
|||
|
||||
|
||||
/*
|
||||
* MakeOpExpression builds an operator expression node. This operator expression
|
||||
* implements the operator clause as defined by the variable and the strategy
|
||||
* number.
|
||||
* MakeOpExpressionExtended builds an operator expression node that's of
|
||||
* the form "Var <op> Expr", where, Expr must either be a Const or a Var
|
||||
* (*1).
|
||||
*
|
||||
* This operator expression implements the operator clause as defined by
|
||||
* the variable and the strategy number.
|
||||
*/
|
||||
OpExpr *
|
||||
MakeOpExpression(Var *variable, int16 strategyNumber)
|
||||
MakeOpExpressionExtended(Var *leftVar, Expr *rightArg, int16 strategyNumber)
|
||||
{
|
||||
Oid typeId = variable->vartype;
|
||||
Oid typeModId = variable->vartypmod;
|
||||
Oid collationId = variable->varcollid;
|
||||
/*
|
||||
* Other types of expressions are probably also fine to be used, but
|
||||
* none of the callers need support for them for now, so we haven't
|
||||
* tested them (*1).
|
||||
*/
|
||||
Assert(IsA(rightArg, Const) || IsA(rightArg, Var));
|
||||
|
||||
Oid typeId = leftVar->vartype;
|
||||
Oid collationId = leftVar->varcollid;
|
||||
|
||||
Oid accessMethodId = BTREE_AM_OID;
|
||||
|
||||
|
|
@ -3200,18 +3209,16 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
|
|||
*/
|
||||
if (operatorClassInputType != typeId && typeType != TYPTYPE_PSEUDO)
|
||||
{
|
||||
variable = (Var *) makeRelabelType((Expr *) variable, operatorClassInputType,
|
||||
leftVar = (Var *) makeRelabelType((Expr *) leftVar, operatorClassInputType,
|
||||
-1, collationId, COERCE_IMPLICIT_CAST);
|
||||
}
|
||||
|
||||
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
|
||||
|
||||
/* Now make the expression with the given variable and a null constant */
|
||||
OpExpr *expression = (OpExpr *) make_opclause(operatorId,
|
||||
InvalidOid, /* no result type yet */
|
||||
false, /* no return set */
|
||||
(Expr *) variable,
|
||||
(Expr *) constantValue,
|
||||
(Expr *) leftVar,
|
||||
rightArg,
|
||||
InvalidOid, collationId);
|
||||
|
||||
/* Set implementing function id and result type */
|
||||
|
|
@ -3222,6 +3229,31 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* MakeOpExpression is a wrapper around MakeOpExpressionExtended
|
||||
* that creates a null constant of the appropriate type for right
|
||||
* hand side operator class input type. As a result, it builds an
|
||||
* operator expression node that's of the form "Var <op> NULL".
|
||||
*/
|
||||
OpExpr *
|
||||
MakeOpExpression(Var *leftVar, int16 strategyNumber)
|
||||
{
|
||||
Oid typeId = leftVar->vartype;
|
||||
Oid typeModId = leftVar->vartypmod;
|
||||
Oid collationId = leftVar->varcollid;
|
||||
|
||||
Oid accessMethodId = BTREE_AM_OID;
|
||||
|
||||
OperatorCacheEntry *operatorCacheEntry = LookupOperatorByType(typeId, accessMethodId,
|
||||
strategyNumber);
|
||||
Oid operatorClassInputType = operatorCacheEntry->operatorClassInputType;
|
||||
|
||||
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
|
||||
|
||||
return MakeOpExpressionExtended(leftVar, (Expr *) constantValue, strategyNumber);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupOperatorByType is a wrapper around GetOperatorByType(),
|
||||
* operatorClassInputType() and get_typtype() functions that uses a cache to avoid
|
||||
|
|
|
|||
|
|
@ -372,6 +372,25 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery)
|
|||
/* we should have found target partition column */
|
||||
Assert(targetPartitionColumnVar != NULL);
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||
if (subqery->hasGroupRTE)
|
||||
{
|
||||
/* if the partition column is a grouped column, we need to flatten it
|
||||
* to ensure query deparsing works correctly. We choose to do this here
|
||||
* instead of in ruletils.c because we want to keep the flattening logic
|
||||
* close to the NOT NULL filter injection.
|
||||
*/
|
||||
RangeTblEntry *partitionRTE = rt_fetch(targetPartitionColumnVar->varno,
|
||||
subqery->rtable);
|
||||
if (partitionRTE->rtekind == RTE_GROUP)
|
||||
{
|
||||
targetPartitionColumnVar = (Var *) flatten_group_exprs(NULL, subqery,
|
||||
(Node *)
|
||||
targetPartitionColumnVar);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* create expression for partition_column IS NOT NULL */
|
||||
NullTest *nullTest = makeNode(NullTest);
|
||||
nullTest->nulltesttype = IS_NOT_NULL;
|
||||
|
|
@ -1609,10 +1628,19 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context)
|
|||
|
||||
/*
|
||||
* TargetEntryChangesValue determines whether the given target entry may
|
||||
* change the value in a given column, given a join tree. The result is
|
||||
* true unless the expression refers directly to the column, or the
|
||||
* expression is a value that is implied by the qualifiers of the join
|
||||
* tree, or the target entry sets a different column.
|
||||
* change the value given a column and a join tree.
|
||||
*
|
||||
* The function assumes that the "targetEntry" references given "column"
|
||||
* Var via its "resname" and is used as part of a modify query. This means
|
||||
* that, for example, for an update query, the input "targetEntry" constructs
|
||||
* the following assignment operation as part of the SET clause:
|
||||
* "col_a = expr_a ", where, "col_a" refers to input "column" Var (via
|
||||
* "resname") as per the assumption written above. And we want to understand
|
||||
* if "expr_a" (which is pointed to by targetEntry->expr) refers directly to
|
||||
* the "column" Var, or "expr_a" is a value that is implied to be equal
|
||||
* to "column" Var by the qualifiers of the join tree. If so, we know that
|
||||
* the value of "col_a" effectively cannot be changed by this assignment
|
||||
* operation.
|
||||
*/
|
||||
bool
|
||||
TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree)
|
||||
|
|
@ -1623,11 +1651,36 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
if (IsA(setExpr, Var))
|
||||
{
|
||||
Var *newValue = (Var *) setExpr;
|
||||
if (newValue->varattno == column->varattno)
|
||||
if (column->varno == newValue->varno &&
|
||||
column->varattno == newValue->varattno)
|
||||
{
|
||||
/* target entry of the form SET col = table.col */
|
||||
/*
|
||||
* Target entry is of the form "SET col_a = foo.col_b",
|
||||
* where foo also points to the same range table entry
|
||||
* and col_a and col_b are the same. So, effectively
|
||||
* they're literally referring to the same column.
|
||||
*/
|
||||
isColumnValueChanged = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
List *restrictClauseList = WhereClauseList(joinTree);
|
||||
OpExpr *equalityExpr = MakeOpExpressionExtended(column, (Expr *) newValue,
|
||||
BTEqualStrategyNumber);
|
||||
|
||||
bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr),
|
||||
restrictClauseList, false);
|
||||
if (predicateIsImplied)
|
||||
{
|
||||
/*
|
||||
* Target entry is of the form
|
||||
* "SET col_a = foo.col_b WHERE col_a = foo.col_b (AND (...))",
|
||||
* where foo points to a different relation or it points
|
||||
* to the same relation but col_a is not the same column as col_b.
|
||||
*/
|
||||
isColumnValueChanged = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (IsA(setExpr, Const))
|
||||
{
|
||||
|
|
@ -1648,7 +1701,10 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
restrictClauseList, false);
|
||||
if (predicateIsImplied)
|
||||
{
|
||||
/* target entry of the form SET col = <x> WHERE col = <x> AND ... */
|
||||
/*
|
||||
* Target entry is of the form
|
||||
* "SET col_a = const_a WHERE col_a = const_a (AND (...))".
|
||||
*/
|
||||
isColumnValueChanged = false;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2029,7 +2029,9 @@ SubqueryPushdownMultiNodeTree(Query *originalQuery)
|
|||
pushedDownQuery->setOperations = copyObject(queryTree->setOperations);
|
||||
pushedDownQuery->querySource = queryTree->querySource;
|
||||
pushedDownQuery->hasSubLinks = queryTree->hasSubLinks;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||
pushedDownQuery->hasGroupRTE = queryTree->hasGroupRTE;
|
||||
#endif
|
||||
MultiTable *subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery);
|
||||
|
||||
SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode);
|
||||
|
|
|
|||
|
|
@ -2431,7 +2431,7 @@ FilterJoinRestrictionContext(JoinRestrictionContext *joinRestrictionContext, Rel
|
|||
|
||||
/*
|
||||
* RangeTableArrayContainsAnyRTEIdentities returns true if any of the range table entries
|
||||
* int rangeTableEntries array is an range table relation specified in queryRteIdentities.
|
||||
* in rangeTableEntries array is a range table relation specified in queryRteIdentities.
|
||||
*/
|
||||
static bool
|
||||
RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
|
||||
|
|
@ -2444,6 +2444,18 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
|
|||
List *rangeTableRelationList = NULL;
|
||||
ListCell *rteRelationCell = NULL;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||
|
||||
/*
|
||||
* In PG18+, planner array simple_rte_array may contain NULL entries
|
||||
* for "dead relations". See PG commits 5f6f951 and e9a20e4 for details.
|
||||
*/
|
||||
if (rangeTableEntry == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Get list of all RTE_RELATIONs in the given range table entry
|
||||
* (i.e.,rangeTableEntry could be a subquery where we're interested
|
||||
|
|
|
|||
|
|
@ -586,7 +586,8 @@ extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
|
|||
plannerRestrictionContext);
|
||||
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
|
||||
char *queryString);
|
||||
|
||||
extern OpExpr * MakeOpExpressionExtended(Var *leftVar, Expr *rightArg,
|
||||
int16 strategyNumber);
|
||||
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
||||
extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
|
||||
List *groupClauseList,
|
||||
|
|
|
|||
|
|
@ -193,13 +193,148 @@ SQL function "compare_data" statement 2
|
|||
|
||||
(1 row)
|
||||
|
||||
---- https://github.com/citusdata/citus/issues/8180 ----
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2
|
||||
ON (dist_1.a = dist_2.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = src.b;
|
||||
MERGE INTO dist_different_order_1
|
||||
USING dist_1
|
||||
ON (dist_different_order_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
||||
CREATE TABLE dist_1_cast (a int, b int);
|
||||
CREATE TABLE dist_2_cast (a int, b numeric);
|
||||
SELECT create_distributed_table('dist_1_cast', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_2_cast', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO dist_1_cast
|
||||
USING dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source.
|
||||
DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data
|
||||
MERGE INTO dist_1_cast
|
||||
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
-- a more sophisticated example
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- compare both targets
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
targets_match
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||
NOTICE: drop cascades to 8 other objects
|
||||
DETAIL: drop cascades to table pg_target
|
||||
drop cascades to table pg_source
|
||||
drop cascades to function cleanup_data()
|
||||
drop cascades to function setup_data()
|
||||
drop cascades to function check_data(text,text,text,text)
|
||||
drop cascades to function compare_data()
|
||||
drop cascades to table citus_target
|
||||
drop cascades to table citus_source
|
||||
|
|
|
|||
|
|
@ -394,9 +394,9 @@ DEBUG: Wrapping relation "mat_view_on_part_dist" "foo" to a subquery
|
|||
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.mat_view_on_part_dist foo WHERE true
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table SET a = foo.a FROM (SELECT foo_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo_1) foo WHERE (foo.a OPERATOR(pg_catalog.=) partitioned_distributed_table.a)
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- should work
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ SET citus.shard_count TO 32;
|
|||
SET citus.next_shard_id TO 750000;
|
||||
SET citus.next_placement_id TO 750000;
|
||||
CREATE SCHEMA multi_modifications;
|
||||
SET search_path TO multi_modifications;
|
||||
-- some failure messages that comes from the worker nodes
|
||||
-- might change due to parallel executions, so suppress those
|
||||
-- using \set VERBOSITY terse
|
||||
|
|
@ -31,8 +32,12 @@ SELECT create_distributed_table('limit_orders', 'id', 'hash');
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('multiple_hash', 'id', 'hash');
|
||||
ERROR: column "id" of relation "multiple_hash" does not exist
|
||||
SELECT create_distributed_table('multiple_hash', 'category', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('range_partitioned', 'id', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -344,22 +349,26 @@ ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001
|
|||
-- Test that shards which miss a modification are marked unhealthy
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
-- the whole transaction should fail
|
||||
\set VERBOSITY terse
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
ERROR: relation "public.limit_orders_750000" does not exist
|
||||
ERROR: relation "multi_modifications.limit_orders_750000" does not exist
|
||||
-- set the shard name back
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
-- Verify the insert failed and both placements are healthy
|
||||
-- or the insert succeeded and placement marked unhealthy
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -367,6 +376,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
|||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -374,6 +384,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
|||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -394,14 +405,16 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
-- Test that if all shards miss a modification, no state change occurs
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
\set VERBOSITY terse
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
ERROR: relation "public.limit_orders_750000" does not exist
|
||||
ERROR: relation "multi_modifications.limit_orders_750000" does not exist
|
||||
\set VERBOSITY DEFAULT
|
||||
-- Last: Verify worker is still healthy
|
||||
SELECT count(*)
|
||||
|
|
@ -420,10 +433,12 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
-- Undo our change...
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- attempting to change the partition key is unsupported
|
||||
UPDATE limit_orders SET id = 0 WHERE id = 246;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
|
|
@ -433,6 +448,368 @@ ERROR: modifying the partition value of rows is not allowed
|
|||
UPDATE limit_orders SET id = 246 WHERE id = 246;
|
||||
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
|
||||
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_non_colocated (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- https://github.com/citusdata/citus/issues/8087
|
||||
--
|
||||
---- update: should work ----
|
||||
-- setting shard key to itself --
|
||||
UPDATE dist_1 SET a = dist_1.a;
|
||||
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = b WHERE a = b;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
|
||||
with cte as (
|
||||
select a, b from dist_1
|
||||
)
|
||||
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
|
||||
with cte as (
|
||||
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
|
||||
with cte as (
|
||||
select d2.a as x, d1.b as y
|
||||
from dist_1 d1, dist_different_order_1 d2
|
||||
where d1.a=d2.a)
|
||||
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_2 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
|
||||
-- supported although the where clause will certainly eval to false
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
-- test with extra quals
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
|
||||
---- update: errors in router planner ----
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = dist_1.b;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- (*1) Would normally expect this to not throw an error because
|
||||
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
|
||||
-- so dist_1.a = dist_2.a, so we should be able to deduce
|
||||
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
|
||||
-- is not that smart.
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- and same here
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
---- update: errors later (in logical or physical planner) ----
|
||||
-- setting shard key to itself --
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: dist_1 and dist_non_colocated are not colocated
|
||||
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
---- update: a more sophisticated example ----
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
SELECT create_distributed_table('dist_source', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
-- execute the query on distributed tables
|
||||
UPDATE dist_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM dist_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
UPDATE local_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM local_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
-- compare both targets
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
targets_match
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
---- merge: should work ----
|
||||
-- setting shard key to itself --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
-- We don't care about action quals when deciding if the update
|
||||
-- could change the shard key, but still add some action quals for
|
||||
-- testing. See the comments written on top of the line we call
|
||||
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
-- test with extra quals
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
MERGE INTO dist_1
|
||||
USING dist_different_order_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
---- merge: errors in router planner ----
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
-- as in (*1), this is not supported
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b AND src.b = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (true)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a <= src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
---- merge: a more sophisticated example ----
|
||||
DROP TABLE dist_source, dist_target, local_source, local_target;
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- compare both targets
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
targets_match
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- UPDATEs with a FROM clause are supported even with local tables
|
||||
UPDATE limit_orders SET limit_price = 0.00 FROM bidders
|
||||
WHERE limit_orders.id = 246 AND
|
||||
|
|
@ -1353,19 +1730,5 @@ CREATE TABLE multi_modifications.local (a int default 1, b int);
|
|||
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
|
||||
ERROR: subqueries are not supported within INSERT queries
|
||||
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
|
||||
DROP TABLE insufficient_shards;
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE summary_table;
|
||||
DROP TABLE reference_raw_table;
|
||||
DROP TABLE reference_summary_table;
|
||||
DROP TABLE limit_orders;
|
||||
DROP TABLE multiple_hash;
|
||||
DROP TABLE range_partitioned;
|
||||
DROP TABLE append_partitioned;
|
||||
DROP TABLE bidders;
|
||||
DROP FUNCTION stable_append;
|
||||
DROP FUNCTION immutable_append;
|
||||
DROP FUNCTION temp_strict_func;
|
||||
DROP TYPE order_side;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA multi_modifications CASCADE;
|
||||
NOTICE: drop cascades to table multi_modifications.local
|
||||
|
|
|
|||
|
|
@ -323,11 +323,15 @@ HINT: Consider using PL/pgSQL functions instead.
|
|||
SELECT (SELECT 1 FROM test_parameterized_sql limit 1) FROM test_parameterized_sql_function(1);
|
||||
ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported
|
||||
HINT: Consider using PL/pgSQL functions instead.
|
||||
-- this fails pg <18
|
||||
-- succeeds in pg18 because of pg18 commit 0dca5d68d
|
||||
-- where sql functions behave as PL/pgSQL functions and use the cache
|
||||
SELECT test_parameterized_sql_function_in_subquery_where(1);
|
||||
ERROR: could not create distributed plan
|
||||
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
|
||||
HINT: Consider using PL/pgSQL functions instead.
|
||||
CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1
|
||||
test_parameterized_sql_function_in_subquery_where
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- postgres behaves slightly differently for the following
|
||||
-- query where the target list is empty
|
||||
SELECT test_parameterized_sql_function(1);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,382 @@
|
|||
--
|
||||
-- MULTI_SQL_FUNCTION
|
||||
--
|
||||
SET citus.next_shard_id TO 1230000;
|
||||
CREATE FUNCTION sql_test_no_1() RETURNS bigint AS '
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
orders;
|
||||
' LANGUAGE SQL;
|
||||
CREATE FUNCTION sql_test_no_2() RETURNS bigint AS '
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
orders, lineitem
|
||||
WHERE
|
||||
o_orderkey = l_orderkey;
|
||||
' LANGUAGE SQL;
|
||||
CREATE FUNCTION sql_test_no_3() RETURNS bigint AS '
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
orders, customer
|
||||
WHERE
|
||||
o_custkey = c_custkey;
|
||||
' LANGUAGE SQL;
|
||||
CREATE FUNCTION sql_test_no_4() RETURNS bigint AS '
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
orders, customer, lineitem
|
||||
WHERE
|
||||
o_custkey = c_custkey AND
|
||||
o_orderkey = l_orderkey;
|
||||
' LANGUAGE SQL;
|
||||
SET client_min_messages TO INFO;
|
||||
-- now, run plain SQL functions
|
||||
SELECT sql_test_no_1();
|
||||
sql_test_no_1
|
||||
---------------------------------------------------------------------
|
||||
2985
|
||||
(1 row)
|
||||
|
||||
SELECT sql_test_no_2();
|
||||
sql_test_no_2
|
||||
---------------------------------------------------------------------
|
||||
12000
|
||||
(1 row)
|
||||
|
||||
SELECT sql_test_no_3();
|
||||
sql_test_no_3
|
||||
---------------------------------------------------------------------
|
||||
1956
|
||||
(1 row)
|
||||
|
||||
SELECT sql_test_no_4();
|
||||
sql_test_no_4
|
||||
---------------------------------------------------------------------
|
||||
7806
|
||||
(1 row)
|
||||
|
||||
-- run the tests which do not require re-partition
|
||||
-- with real-time executor
|
||||
-- now, run plain SQL functions
|
||||
SELECT sql_test_no_1();
|
||||
sql_test_no_1
|
||||
---------------------------------------------------------------------
|
||||
2985
|
||||
(1 row)
|
||||
|
||||
SELECT sql_test_no_2();
|
||||
sql_test_no_2
|
||||
---------------------------------------------------------------------
|
||||
12000
|
||||
(1 row)
|
||||
|
||||
-- test router executor parameterized sql functions
|
||||
CREATE TABLE temp_table (
|
||||
key int,
|
||||
value int
|
||||
);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('temp_table','key','hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION no_parameter_insert_sql() RETURNS void AS $$
|
||||
INSERT INTO temp_table (key) VALUES (0);
|
||||
$$ LANGUAGE SQL;
|
||||
-- execute 6 times
|
||||
SELECT no_parameter_insert_sql();
|
||||
no_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT no_parameter_insert_sql();
|
||||
no_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT no_parameter_insert_sql();
|
||||
no_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT no_parameter_insert_sql();
|
||||
no_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT no_parameter_insert_sql();
|
||||
no_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT no_parameter_insert_sql();
|
||||
no_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION non_partition_parameter_insert_sql(int) RETURNS void AS $$
|
||||
INSERT INTO temp_table (key, value) VALUES (0, $1);
|
||||
$$ LANGUAGE SQL;
|
||||
-- execute 6 times
|
||||
SELECT non_partition_parameter_insert_sql(10);
|
||||
non_partition_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_insert_sql(20);
|
||||
non_partition_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_insert_sql(30);
|
||||
non_partition_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_insert_sql(40);
|
||||
non_partition_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_insert_sql(50);
|
||||
non_partition_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_insert_sql(60);
|
||||
non_partition_parameter_insert_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- check inserted values
|
||||
SELECT * FROM temp_table ORDER BY key, value;
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 10
|
||||
0 | 20
|
||||
0 | 30
|
||||
0 | 40
|
||||
0 | 50
|
||||
0 | 60
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
(12 rows)
|
||||
|
||||
-- check updates
|
||||
CREATE FUNCTION non_partition_parameter_update_sql(int, int) RETURNS void AS $$
|
||||
UPDATE temp_table SET value = $2 WHERE key = 0 AND value = $1;
|
||||
$$ LANGUAGE SQL;
|
||||
-- execute 6 times
|
||||
SELECT non_partition_parameter_update_sql(10, 12);
|
||||
non_partition_parameter_update_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_update_sql(20, 22);
|
||||
non_partition_parameter_update_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_update_sql(30, 32);
|
||||
non_partition_parameter_update_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_update_sql(40, 42);
|
||||
non_partition_parameter_update_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_update_sql(50, 52);
|
||||
non_partition_parameter_update_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_update_sql(60, 62);
|
||||
non_partition_parameter_update_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- check after updates
|
||||
SELECT * FROM temp_table ORDER BY key, value;
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 | 12
|
||||
0 | 22
|
||||
0 | 32
|
||||
0 | 42
|
||||
0 | 52
|
||||
0 | 62
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
(12 rows)
|
||||
|
||||
-- check deletes
|
||||
CREATE FUNCTION non_partition_parameter_delete_sql(int) RETURNS void AS $$
|
||||
DELETE FROM temp_table WHERE key = 0 AND value = $1;
|
||||
$$ LANGUAGE SQL;
|
||||
-- execute 6 times to trigger prepared statement usage
|
||||
SELECT non_partition_parameter_delete_sql(12);
|
||||
non_partition_parameter_delete_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_delete_sql(22);
|
||||
non_partition_parameter_delete_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_delete_sql(32);
|
||||
non_partition_parameter_delete_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_delete_sql(42);
|
||||
non_partition_parameter_delete_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_delete_sql(52);
|
||||
non_partition_parameter_delete_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT non_partition_parameter_delete_sql(62);
|
||||
non_partition_parameter_delete_sql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- check after deletes
|
||||
SELECT * FROM temp_table ORDER BY key, value;
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
0 |
|
||||
(6 rows)
|
||||
|
||||
-- test running parameterized SQL function
|
||||
CREATE TABLE test_parameterized_sql(id integer, org_id integer);
|
||||
select create_distributed_table('test_parameterized_sql','org_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION test_parameterized_sql_function(org_id_val integer)
|
||||
RETURNS TABLE (a bigint)
|
||||
AS $$
|
||||
SELECT count(*) AS count_val from test_parameterized_sql where org_id = org_id_val;
|
||||
$$ LANGUAGE SQL STABLE;
|
||||
CREATE OR REPLACE FUNCTION test_parameterized_sql_function_in_subquery_where(org_id_val integer)
|
||||
RETURNS TABLE (a bigint)
|
||||
AS $$
|
||||
SELECT count(*) AS count_val from test_parameterized_sql as t1 where
|
||||
org_id IN (SELECT org_id FROM test_parameterized_sql as t2 WHERE t2.org_id = t1.org_id AND org_id = org_id_val);
|
||||
$$ LANGUAGE SQL STABLE;
|
||||
INSERT INTO test_parameterized_sql VALUES(1, 1);
|
||||
-- all of them should fail
|
||||
SELECT * FROM test_parameterized_sql_function(1);
|
||||
ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported
|
||||
HINT: Consider using PL/pgSQL functions instead.
|
||||
SELECT (SELECT 1 FROM test_parameterized_sql limit 1) FROM test_parameterized_sql_function(1);
|
||||
ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported
|
||||
HINT: Consider using PL/pgSQL functions instead.
|
||||
-- this fails pg <18
|
||||
-- succeeds in pg18 because of pg18 commit 0dca5d68d
|
||||
-- where sql functions behave as PL/pgSQL functions and use the cache
|
||||
SELECT test_parameterized_sql_function_in_subquery_where(1);
|
||||
ERROR: could not create distributed plan
|
||||
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
|
||||
HINT: Consider using PL/pgSQL functions instead.
|
||||
CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1
|
||||
-- postgres behaves slightly differently for the following
|
||||
-- query where the target list is empty
|
||||
SELECT test_parameterized_sql_function(1);
|
||||
test_parameterized_sql_function
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- test that sql function calls are treated as multi-statement transactions
|
||||
-- and are rolled back properly. Single-row inserts for not-replicated tables
|
||||
-- don't go over 2PC if they are not part of a bigger transaction.
|
||||
CREATE TABLE table_with_unique_constraint (a int UNIQUE);
|
||||
SELECT create_distributed_table('table_with_unique_constraint', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO table_with_unique_constraint VALUES (1), (2), (3);
|
||||
CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID
|
||||
AS $$
|
||||
INSERT INTO table_with_unique_constraint VALUES (4);
|
||||
INSERT INTO table_with_unique_constraint VALUES (4);
|
||||
$$ LANGUAGE SQL;
|
||||
SELECT insert_twice();
|
||||
ERROR: duplicate key value violates unique constraint "table_with_unique_constraint_a_key_1230009"
|
||||
DETAIL: Key (a)=(4) already exists.
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SQL function "insert_twice" statement 2
|
||||
SELECT * FROM table_with_unique_constraint ORDER BY a;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
2
|
||||
3
|
||||
(3 rows)
|
||||
|
||||
DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint;
|
||||
-- clean-up functions
|
||||
DROP FUNCTION sql_test_no_1();
|
||||
DROP FUNCTION sql_test_no_2();
|
||||
DROP FUNCTION sql_test_no_3();
|
||||
DROP FUNCTION sql_test_no_4();
|
||||
DROP FUNCTION no_parameter_insert_sql();
|
||||
DROP FUNCTION non_partition_parameter_insert_sql(int);
|
||||
DROP FUNCTION non_partition_parameter_update_sql(int, int);
|
||||
DROP FUNCTION non_partition_parameter_delete_sql(int);
|
||||
DROP FUNCTION test_parameterized_sql_function(int);
|
||||
DROP FUNCTION test_parameterized_sql_function_in_subquery_where(int);
|
||||
|
|
@ -70,15 +70,19 @@ SELECT "name" AS "Column",
|
|||
"relid"
|
||||
FROM table_attrs;
|
||||
|
||||
CREATE VIEW table_checks AS
|
||||
SELECT cc.constraint_name AS "Constraint",
|
||||
('CHECK ' || regexp_replace(check_clause, '^\((.*)\)$', '\1')) AS "Definition",
|
||||
format('%I.%I', ccu.table_schema, ccu.table_name)::regclass::oid AS relid
|
||||
FROM information_schema.check_constraints cc,
|
||||
information_schema.constraint_column_usage ccu
|
||||
WHERE cc.constraint_schema = ccu.constraint_schema AND
|
||||
cc.constraint_name = ccu.constraint_name
|
||||
ORDER BY cc.constraint_name ASC;
|
||||
CREATE OR REPLACE VIEW table_checks AS
|
||||
SELECT
|
||||
c.conname AS "Constraint",
|
||||
'CHECK ' ||
|
||||
-- drop a single pair of outer parens if the deparser adds them
|
||||
regexp_replace(pg_get_expr(c.conbin, c.conrelid, true), '^\((.*)\)$', '\1')
|
||||
AS "Definition",
|
||||
c.conrelid AS relid
|
||||
FROM pg_catalog.pg_constraint AS c
|
||||
WHERE c.contype <> 'n' -- drop NOT NULL
|
||||
AND c.conbin IS NOT NULL -- only things with an expression (i.e., CHECKs)
|
||||
AND c.conrelid <> 0 -- table-level (exclude domain checks)
|
||||
ORDER BY "Constraint", "Definition";
|
||||
|
||||
CREATE VIEW index_attrs AS
|
||||
WITH indexoid AS (
|
||||
|
|
|
|||
|
|
@ -0,0 +1,169 @@
|
|||
--
|
||||
-- PG18
|
||||
--
|
||||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 18 AS server_version_ge_18
|
||||
\gset
|
||||
-- test invalid statistics
|
||||
-- behavior is same among PG versions, error message differs
|
||||
-- relevant PG18 commit: 3eea4dc2c7
|
||||
CREATE STATISTICS tst ON a FROM (VALUES (x)) AS foo;
|
||||
ERROR: cannot create statistics on the specified relation
|
||||
DETAIL: CREATE STATISTICS only supports tables, foreign tables and materialized views.
|
||||
\if :server_version_ge_18
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
-- PG18-specific tests go here.
|
||||
--
|
||||
-- Purpose: Verify PG18 behavior that NOT NULL constraints are materialized
|
||||
-- as pg_constraint rows with contype = 'n' on both coordinator and
|
||||
-- worker shards. Also confirm our helper view (table_checks) does
|
||||
-- NOT surface NOT NULL entries.
|
||||
-- https://github.com/postgres/postgres/commit/14e87ffa5c543b5f30ead7413084c25f7735039f
|
||||
CREATE SCHEMA pg18_nn;
|
||||
SET search_path TO pg18_nn;
|
||||
-- Local control table
|
||||
DROP TABLE IF EXISTS nn_local CASCADE;
|
||||
NOTICE: table "nn_local" does not exist, skipping
|
||||
CREATE TABLE nn_local(
|
||||
a int NOT NULL,
|
||||
b int,
|
||||
c text NOT NULL
|
||||
);
|
||||
-- Distributed table
|
||||
DROP TABLE IF EXISTS nn_dist CASCADE;
|
||||
NOTICE: table "nn_dist" does not exist, skipping
|
||||
CREATE TABLE nn_dist(
|
||||
a int NOT NULL,
|
||||
b int,
|
||||
c text NOT NULL
|
||||
);
|
||||
SELECT create_distributed_table('nn_dist', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Coordinator: count NOT NULL constraint rows
|
||||
SELECT 'local_n_count' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'pg18_nn.nn_local'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
label | contype | count
|
||||
---------------------------------------------------------------------
|
||||
local_n_count | n | 2
|
||||
(1 row)
|
||||
|
||||
SELECT 'dist_n_count' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'pg18_nn.nn_dist'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
label | contype | count
|
||||
---------------------------------------------------------------------
|
||||
dist_n_count | n | 2
|
||||
(1 row)
|
||||
|
||||
-- Our helper view should exclude NOT NULL
|
||||
SELECT 'table_checks_local_count' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = 'pg18_nn.nn_local'::regclass;
|
||||
label | count
|
||||
---------------------------------------------------------------------
|
||||
table_checks_local_count | 0
|
||||
(1 row)
|
||||
|
||||
SELECT 'table_checks_dist_count' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = 'pg18_nn.nn_dist'::regclass;
|
||||
label | count
|
||||
---------------------------------------------------------------------
|
||||
table_checks_dist_count | 0
|
||||
(1 row)
|
||||
|
||||
-- Add a real CHECK to ensure table_checks still reports real checks
|
||||
ALTER TABLE nn_dist ADD CONSTRAINT nn_dist_check CHECK (b IS DISTINCT FROM 42);
|
||||
SELECT 'table_checks_dist_with_real_check' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = 'pg18_nn.nn_dist'::regclass;
|
||||
label | count
|
||||
---------------------------------------------------------------------
|
||||
table_checks_dist_with_real_check | 1
|
||||
(1 row)
|
||||
|
||||
-- === Worker checks ===
|
||||
\c - - - :worker_1_port
|
||||
SET client_min_messages TO WARNING;
|
||||
SET search_path TO pg18_nn;
|
||||
-- Pick one heap shard of nn_dist in our schema
|
||||
SELECT format('%I.%I', n.nspname, c.relname) AS shard_regclass
|
||||
FROM pg_class c
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE n.nspname = 'pg18_nn'
|
||||
AND c.relname LIKE 'nn_dist_%'
|
||||
AND c.relkind = 'r'
|
||||
ORDER BY c.relname
|
||||
LIMIT 1
|
||||
\gset
|
||||
-- Expect: 2 NOT NULL rows (a,c) + 1 CHECK row on the shard
|
||||
SELECT 'worker_shard_n_count' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = :'shard_regclass'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
label | contype | count
|
||||
---------------------------------------------------------------------
|
||||
worker_shard_n_count | c | 1
|
||||
worker_shard_n_count | n | 2
|
||||
(2 rows)
|
||||
|
||||
-- table_checks on shard should hide NOT NULL
|
||||
SELECT 'table_checks_worker_shard_count' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = :'shard_regclass'::regclass;
|
||||
label | count
|
||||
---------------------------------------------------------------------
|
||||
table_checks_worker_shard_count | 1
|
||||
(1 row)
|
||||
|
||||
-- Drop one NOT NULL on coordinator; verify propagation
|
||||
\c - - - :master_port
|
||||
SET search_path TO pg18_nn;
|
||||
ALTER TABLE nn_dist ALTER COLUMN c DROP NOT NULL;
|
||||
-- Re-check on worker: NOT NULL count should drop to 1
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_nn;
|
||||
SELECT 'worker_shard_n_after_drop' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = :'shard_regclass'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
label | contype | count
|
||||
---------------------------------------------------------------------
|
||||
worker_shard_n_after_drop | c | 1
|
||||
worker_shard_n_after_drop | n | 1
|
||||
(2 rows)
|
||||
|
||||
-- And on coordinator
|
||||
\c - - - :master_port
|
||||
SET search_path TO pg18_nn;
|
||||
SELECT 'dist_n_after_drop' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'pg18_nn.nn_dist'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
label | contype | count
|
||||
---------------------------------------------------------------------
|
||||
dist_n_after_drop | c | 1
|
||||
dist_n_after_drop | n | 1
|
||||
(2 rows)
|
||||
|
||||
-- cleanup
|
||||
RESET client_min_messages;
|
||||
RESET search_path;
|
||||
DROP SCHEMA pg18_nn CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table pg18_nn.nn_local
|
||||
drop cascades to table pg18_nn.nn_dist
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
--
|
||||
-- PG18
|
||||
--
|
||||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 18 AS server_version_ge_18
|
||||
\gset
|
||||
-- test invalid statistics
|
||||
-- behavior is same among PG versions, error message differs
|
||||
-- relevant PG18 commit: 3eea4dc2c7
|
||||
CREATE STATISTICS tst ON a FROM (VALUES (x)) AS foo;
|
||||
ERROR: only a single relation is allowed in CREATE STATISTICS
|
||||
\if :server_version_ge_18
|
||||
\else
|
||||
\q
|
||||
|
|
@ -68,6 +68,7 @@ test: pg14
|
|||
test: pg15
|
||||
test: pg15_jsonpath detect_conn_close
|
||||
test: pg17 pg17_json
|
||||
test: pg18
|
||||
test: drop_column_partitioned_table
|
||||
test: tableam
|
||||
|
||||
|
|
|
|||
|
|
@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN
|
|||
|
||||
SELECT compare_data();
|
||||
|
||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||
---- https://github.com/citusdata/citus/issues/8180 ----
|
||||
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2
|
||||
ON (dist_1.a = dist_2.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = src.b;
|
||||
|
||||
MERGE INTO dist_different_order_1
|
||||
USING dist_1
|
||||
ON (dist_different_order_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
||||
|
||||
CREATE TABLE dist_1_cast (a int, b int);
|
||||
CREATE TABLE dist_2_cast (a int, b numeric);
|
||||
|
||||
SELECT create_distributed_table('dist_1_cast', 'a');
|
||||
SELECT create_distributed_table('dist_2_cast', 'a');
|
||||
|
||||
MERGE INTO dist_1_cast
|
||||
USING dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
|
||||
MERGE INTO dist_1_cast
|
||||
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
|
||||
-- a more sophisticated example
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- compare both targets
|
||||
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ SET citus.next_shard_id TO 750000;
|
|||
SET citus.next_placement_id TO 750000;
|
||||
|
||||
CREATE SCHEMA multi_modifications;
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- some failure messages that comes from the worker nodes
|
||||
-- might change due to parallel executions, so suppress those
|
||||
|
|
@ -36,7 +37,7 @@ CREATE TABLE append_partitioned ( LIKE limit_orders );
|
|||
SET citus.shard_count TO 2;
|
||||
|
||||
SELECT create_distributed_table('limit_orders', 'id', 'hash');
|
||||
SELECT create_distributed_table('multiple_hash', 'id', 'hash');
|
||||
SELECT create_distributed_table('multiple_hash', 'category', 'hash');
|
||||
SELECT create_distributed_table('range_partitioned', 'id', 'range');
|
||||
SELECT create_distributed_table('append_partitioned', 'id', 'append');
|
||||
|
||||
|
|
@ -245,12 +246,14 @@ INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
|
|||
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
-- the whole transaction should fail
|
||||
|
|
@ -259,6 +262,7 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
|
|||
|
||||
-- set the shard name back
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
|
|
@ -266,12 +270,15 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
|||
-- Verify the insert failed and both placements are healthy
|
||||
-- or the insert succeeded and placement marked unhealthy
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
|
||||
|
|
@ -286,12 +293,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
\set VERBOSITY terse
|
||||
|
|
@ -312,12 +321,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- attempting to change the partition key is unsupported
|
||||
UPDATE limit_orders SET id = 0 WHERE id = 246;
|
||||
|
|
@ -328,6 +339,375 @@ UPDATE limit_orders SET id = 246 WHERE id = 246;
|
|||
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
|
||||
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
|
||||
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_non_colocated (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
|
||||
--
|
||||
-- https://github.com/citusdata/citus/issues/8087
|
||||
--
|
||||
|
||||
---- update: should work ----
|
||||
|
||||
-- setting shard key to itself --
|
||||
|
||||
UPDATE dist_1 SET a = dist_1.a;
|
||||
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = b WHERE a = b;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
|
||||
|
||||
with cte as (
|
||||
select a, b from dist_1
|
||||
)
|
||||
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
|
||||
|
||||
with cte as (
|
||||
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
|
||||
|
||||
with cte as (
|
||||
select d2.a as x, d1.b as y
|
||||
from dist_1 d1, dist_different_order_1 d2
|
||||
where d1.a=d2.a)
|
||||
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
|
||||
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_2 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
|
||||
|
||||
-- supported although the where clause will certainly eval to false
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
|
||||
-- test with extra quals
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
|
||||
|
||||
---- update: errors in router planner ----
|
||||
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = dist_1.b;
|
||||
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
|
||||
|
||||
-- (*1) Would normally expect this to not throw an error because
|
||||
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
|
||||
-- so dist_1.a = dist_2.a, so we should be able to deduce
|
||||
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
|
||||
-- is not that smart.
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
|
||||
|
||||
-- and same here
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
|
||||
|
||||
---- update: errors later (in logical or physical planner) ----
|
||||
|
||||
-- setting shard key to itself --
|
||||
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
|
||||
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
|
||||
|
||||
---- update: a more sophisticated example ----
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
SELECT create_distributed_table('dist_source', 'int_col');
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
|
||||
-- execute the query on distributed tables
|
||||
UPDATE dist_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM dist_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
UPDATE local_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM local_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
|
||||
-- compare both targets
|
||||
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
|
||||
---- merge: should work ----
|
||||
|
||||
-- setting shard key to itself --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
|
||||
-- We don't care about action quals when deciding if the update
|
||||
-- could change the shard key, but still add some action quals for
|
||||
-- testing. See the comments written on top of the line we call
|
||||
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
-- test with extra quals
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_different_order_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
---- merge: errors in router planner ----
|
||||
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
-- as in (*1), this is not supported
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b AND src.b = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (true)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a <= src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
---- merge: a more sophisticated example ----
|
||||
DROP TABLE dist_source, dist_target, local_source, local_target;
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- compare both targets
|
||||
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
|
||||
-- UPDATEs with a FROM clause are supported even with local tables
|
||||
UPDATE limit_orders SET limit_price = 0.00 FROM bidders
|
||||
WHERE limit_orders.id = 246 AND
|
||||
|
|
@ -914,20 +1294,5 @@ DELETE FROM summary_table WHERE id < (
|
|||
CREATE TABLE multi_modifications.local (a int default 1, b int);
|
||||
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
|
||||
|
||||
DROP TABLE insufficient_shards;
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE summary_table;
|
||||
DROP TABLE reference_raw_table;
|
||||
DROP TABLE reference_summary_table;
|
||||
DROP TABLE limit_orders;
|
||||
DROP TABLE multiple_hash;
|
||||
DROP TABLE range_partitioned;
|
||||
DROP TABLE append_partitioned;
|
||||
DROP TABLE bidders;
|
||||
|
||||
DROP FUNCTION stable_append;
|
||||
DROP FUNCTION immutable_append;
|
||||
DROP FUNCTION temp_strict_func;
|
||||
DROP TYPE order_side;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA multi_modifications CASCADE;
|
||||
|
|
|
|||
|
|
@ -147,6 +147,9 @@ SELECT * FROM test_parameterized_sql_function(1);
|
|||
|
||||
SELECT (SELECT 1 FROM test_parameterized_sql limit 1) FROM test_parameterized_sql_function(1);
|
||||
|
||||
-- this fails pg <18
|
||||
-- succeeds in pg18 because of pg18 commit 0dca5d68d
|
||||
-- where sql functions behave as PL/pgSQL functions and use the cache
|
||||
SELECT test_parameterized_sql_function_in_subquery_where(1);
|
||||
|
||||
-- postgres behaves slightly differently for the following
|
||||
|
|
|
|||
|
|
@ -71,15 +71,19 @@ SELECT "name" AS "Column",
|
|||
"relid"
|
||||
FROM table_attrs;
|
||||
|
||||
CREATE VIEW table_checks AS
|
||||
SELECT cc.constraint_name AS "Constraint",
|
||||
('CHECK ' || regexp_replace(check_clause, '^\((.*)\)$', '\1')) AS "Definition",
|
||||
format('%I.%I', ccu.table_schema, ccu.table_name)::regclass::oid AS relid
|
||||
FROM information_schema.check_constraints cc,
|
||||
information_schema.constraint_column_usage ccu
|
||||
WHERE cc.constraint_schema = ccu.constraint_schema AND
|
||||
cc.constraint_name = ccu.constraint_name
|
||||
ORDER BY cc.constraint_name ASC;
|
||||
CREATE OR REPLACE VIEW table_checks AS
|
||||
SELECT
|
||||
c.conname AS "Constraint",
|
||||
'CHECK ' ||
|
||||
-- drop a single pair of outer parens if the deparser adds them
|
||||
regexp_replace(pg_get_expr(c.conbin, c.conrelid, true), '^\((.*)\)$', '\1')
|
||||
AS "Definition",
|
||||
c.conrelid AS relid
|
||||
FROM pg_catalog.pg_constraint AS c
|
||||
WHERE c.contype <> 'n' -- drop NOT NULL
|
||||
AND c.conbin IS NOT NULL -- only things with an expression (i.e., CHECKs)
|
||||
AND c.conrelid <> 0 -- table-level (exclude domain checks)
|
||||
ORDER BY "Constraint", "Definition";
|
||||
|
||||
CREATE VIEW index_attrs AS
|
||||
WITH indexoid AS (
|
||||
|
|
|
|||
|
|
@ -0,0 +1,134 @@
|
|||
--
|
||||
-- PG18
|
||||
--
|
||||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 18 AS server_version_ge_18
|
||||
\gset
|
||||
|
||||
-- test invalid statistics
|
||||
-- behavior is same among PG versions, error message differs
|
||||
-- relevant PG18 commit: 3eea4dc2c7
|
||||
CREATE STATISTICS tst ON a FROM (VALUES (x)) AS foo;
|
||||
|
||||
\if :server_version_ge_18
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
|
||||
-- PG18-specific tests go here.
|
||||
--
|
||||
|
||||
-- Purpose: Verify PG18 behavior that NOT NULL constraints are materialized
|
||||
-- as pg_constraint rows with contype = 'n' on both coordinator and
|
||||
-- worker shards. Also confirm our helper view (table_checks) does
|
||||
-- NOT surface NOT NULL entries.
|
||||
-- https://github.com/postgres/postgres/commit/14e87ffa5c543b5f30ead7413084c25f7735039f
|
||||
|
||||
CREATE SCHEMA pg18_nn;
|
||||
SET search_path TO pg18_nn;
|
||||
|
||||
-- Local control table
|
||||
DROP TABLE IF EXISTS nn_local CASCADE;
|
||||
CREATE TABLE nn_local(
|
||||
a int NOT NULL,
|
||||
b int,
|
||||
c text NOT NULL
|
||||
);
|
||||
|
||||
-- Distributed table
|
||||
DROP TABLE IF EXISTS nn_dist CASCADE;
|
||||
CREATE TABLE nn_dist(
|
||||
a int NOT NULL,
|
||||
b int,
|
||||
c text NOT NULL
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('nn_dist', 'a');
|
||||
|
||||
-- Coordinator: count NOT NULL constraint rows
|
||||
SELECT 'local_n_count' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'pg18_nn.nn_local'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
|
||||
SELECT 'dist_n_count' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'pg18_nn.nn_dist'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
|
||||
-- Our helper view should exclude NOT NULL
|
||||
SELECT 'table_checks_local_count' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = 'pg18_nn.nn_local'::regclass;
|
||||
|
||||
SELECT 'table_checks_dist_count' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = 'pg18_nn.nn_dist'::regclass;
|
||||
|
||||
-- Add a real CHECK to ensure table_checks still reports real checks
|
||||
ALTER TABLE nn_dist ADD CONSTRAINT nn_dist_check CHECK (b IS DISTINCT FROM 42);
|
||||
|
||||
SELECT 'table_checks_dist_with_real_check' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = 'pg18_nn.nn_dist'::regclass;
|
||||
|
||||
-- === Worker checks ===
|
||||
\c - - - :worker_1_port
|
||||
SET client_min_messages TO WARNING;
|
||||
SET search_path TO pg18_nn;
|
||||
|
||||
-- Pick one heap shard of nn_dist in our schema
|
||||
SELECT format('%I.%I', n.nspname, c.relname) AS shard_regclass
|
||||
FROM pg_class c
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE n.nspname = 'pg18_nn'
|
||||
AND c.relname LIKE 'nn_dist_%'
|
||||
AND c.relkind = 'r'
|
||||
ORDER BY c.relname
|
||||
LIMIT 1
|
||||
\gset
|
||||
|
||||
-- Expect: 2 NOT NULL rows (a,c) + 1 CHECK row on the shard
|
||||
SELECT 'worker_shard_n_count' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = :'shard_regclass'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
|
||||
-- table_checks on shard should hide NOT NULL
|
||||
SELECT 'table_checks_worker_shard_count' AS label, count(*)
|
||||
FROM public.table_checks
|
||||
WHERE relid = :'shard_regclass'::regclass;
|
||||
|
||||
-- Drop one NOT NULL on coordinator; verify propagation
|
||||
\c - - - :master_port
|
||||
SET search_path TO pg18_nn;
|
||||
|
||||
ALTER TABLE nn_dist ALTER COLUMN c DROP NOT NULL;
|
||||
|
||||
-- Re-check on worker: NOT NULL count should drop to 1
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_nn;
|
||||
|
||||
SELECT 'worker_shard_n_after_drop' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = :'shard_regclass'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
|
||||
-- And on coordinator
|
||||
\c - - - :master_port
|
||||
SET search_path TO pg18_nn;
|
||||
|
||||
SELECT 'dist_n_after_drop' AS label, contype, count(*)
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'pg18_nn.nn_dist'::regclass
|
||||
GROUP BY contype
|
||||
ORDER BY contype;
|
||||
|
||||
-- cleanup
|
||||
RESET client_min_messages;
|
||||
RESET search_path;
|
||||
DROP SCHEMA pg18_nn CASCADE;
|
||||
Loading…
Reference in New Issue