mirror of https://github.com/citusdata/citus.git
Fixes a bug preventing INSERT SELECT .. ON CONFLICT with a constraint name on local shards
Separate search relation shard function Add testspull/4326/head
parent
46be63d76b
commit
a64dc8a72b
|
@ -13,6 +13,8 @@
|
|||
#include "c.h"
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
|
@ -34,10 +36,13 @@
|
|||
#include "storage/lock.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
#include "utils/syscache.h"
|
||||
|
||||
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
|
||||
RangeTblEntry *valuesRTE, Task *task);
|
||||
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
|
||||
OnConflictExpr *onConflict);
|
||||
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
|
||||
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
||||
static bool ShouldLazyDeparseQuery(Task *task);
|
||||
static char * DeparseTaskQuery(Task *task, Query *query);
|
||||
|
@ -203,9 +208,6 @@ bool
|
|||
UpdateRelationToShardNames(Node *node, List *relationShardList)
|
||||
{
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
Oid relationId = InvalidOid;
|
||||
ListCell *relationShardCell = NULL;
|
||||
RelationShard *relationShard = NULL;
|
||||
|
||||
if (node == NULL)
|
||||
{
|
||||
|
@ -238,24 +240,8 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
|
|||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Search for the restrictions associated with the RTE. There better be
|
||||
* some, otherwise this query wouldn't be eligible as a router query.
|
||||
*
|
||||
* FIXME: We should probably use a hashtable here, to do efficient
|
||||
* lookup.
|
||||
*/
|
||||
foreach(relationShardCell, relationShardList)
|
||||
{
|
||||
relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||
|
||||
if (newRte->relid == relationShard->relationId)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
relationShard = NULL;
|
||||
}
|
||||
RelationShard *relationShard = FindRelationShard(newRte->relid,
|
||||
relationShardList);
|
||||
|
||||
bool replaceRteWithNullValues = relationShard == NULL ||
|
||||
relationShard->shardId == INVALID_SHARD_ID;
|
||||
|
@ -266,7 +252,7 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
|
|||
}
|
||||
|
||||
shardId = relationShard->shardId;
|
||||
relationId = relationShard->relationId;
|
||||
Oid relationId = relationShard->relationId;
|
||||
|
||||
char *relationName = get_rel_name(relationId);
|
||||
AppendShardIdToName(&relationName, shardId);
|
||||
|
@ -300,6 +286,13 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
|
|||
relationShardList, QTW_EXAMINE_RTES_BEFORE);
|
||||
}
|
||||
|
||||
if (IsA(node, OnConflictExpr))
|
||||
{
|
||||
OnConflictExpr *onConflict = (OnConflictExpr *) node;
|
||||
|
||||
return ReplaceRelationConstraintByShardConstraint(relationShardList, onConflict);
|
||||
}
|
||||
|
||||
if (!IsA(node, RangeTblEntry))
|
||||
{
|
||||
return expression_tree_walker(node, UpdateRelationsToLocalShardTables,
|
||||
|
@ -313,27 +306,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
|
|||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Search for the restrictions associated with the RTE. There better be
|
||||
* some, otherwise this query wouldn't be eligible as a router query.
|
||||
*
|
||||
* FIXME: We should probably use a hashtable here, to do efficient
|
||||
* lookup.
|
||||
*/
|
||||
ListCell *relationShardCell = NULL;
|
||||
RelationShard *relationShard = NULL;
|
||||
|
||||
foreach(relationShardCell, relationShardList)
|
||||
{
|
||||
relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||
|
||||
if (newRte->relid == relationShard->relationId)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
relationShard = NULL;
|
||||
}
|
||||
RelationShard *relationShard = FindRelationShard(newRte->relid,
|
||||
relationShardList);
|
||||
|
||||
/* the function should only be called with local shards */
|
||||
if (relationShard == NULL)
|
||||
|
@ -350,6 +324,92 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReplaceRelationConstraintByShardConstraint replaces given OnConflictExpr's
|
||||
* constraint id with constraint id of the corresponding shard.
|
||||
*/
|
||||
static bool
|
||||
ReplaceRelationConstraintByShardConstraint(List *relationShardList,
|
||||
OnConflictExpr *onConflict)
|
||||
{
|
||||
Oid constraintId = onConflict->constraint;
|
||||
|
||||
if (!OidIsValid(constraintId))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
Oid constraintRelationId = InvalidOid;
|
||||
|
||||
HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintId));
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
constraintRelationId = contup->conrelid;
|
||||
ReleaseSysCache(heapTuple);
|
||||
}
|
||||
|
||||
/*
|
||||
* We can return here without calling the walker function, since we know there
|
||||
* will be no possible tables or constraints after this point, by the syntax.
|
||||
*/
|
||||
if (!OidIsValid(constraintRelationId))
|
||||
{
|
||||
ereport(ERROR, (errmsg("Invalid relation id (%u) for constraint: %s",
|
||||
constraintRelationId, get_constraint_name(constraintId))));
|
||||
}
|
||||
|
||||
RelationShard *relationShard = FindRelationShard(constraintRelationId,
|
||||
relationShardList);
|
||||
|
||||
if (relationShard != NULL)
|
||||
{
|
||||
char *constraintName = get_constraint_name(constraintId);
|
||||
|
||||
AppendShardIdToName(&constraintName, relationShard->shardId);
|
||||
|
||||
Oid shardOid = GetTableLocalShardOid(relationShard->relationId,
|
||||
relationShard->shardId);
|
||||
|
||||
Oid shardConstraintId = get_relation_constraint_oid(shardOid, constraintName,
|
||||
false);
|
||||
|
||||
onConflict->constraint = shardConstraintId;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindRelationShard finds the RelationShard for shard relation with
|
||||
* given Oid if exists in given relationShardList. Otherwise, returns NULL.
|
||||
*/
|
||||
static RelationShard *
|
||||
FindRelationShard(Oid inputRelationId, List *relationShardList)
|
||||
{
|
||||
RelationShard *relationShard = NULL;
|
||||
|
||||
/*
|
||||
* Search for the restrictions associated with the RTE. There better be
|
||||
* some, otherwise this query wouldn't be eligible as a router query.
|
||||
* FIXME: We should probably use a hashtable here, to do efficient lookup.
|
||||
*/
|
||||
foreach_ptr(relationShard, relationShardList)
|
||||
{
|
||||
if (inputRelationId == relationShard->relationId)
|
||||
{
|
||||
return relationShard;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
|
||||
* subquery RTE that returns no results.
|
||||
|
|
|
@ -100,6 +100,143 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER
|
|||
5 | 6
|
||||
(5 rows)
|
||||
|
||||
-- Test upsert with constraint
|
||||
CREATE TABLE upsert_test
|
||||
(
|
||||
part_key int UNIQUE,
|
||||
other_col int,
|
||||
third_col int
|
||||
);
|
||||
-- distribute the table
|
||||
SELECT create_distributed_table('upsert_test', 'part_key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- do a regular insert
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 |
|
||||
2 | 2 |
|
||||
(2 rows)
|
||||
|
||||
SET citus.log_remote_commands to true;
|
||||
-- observe that there is a conflict and the following query does nothing
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- same as the above with different syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- again the same query with another syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630513 DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
BEGIN;
|
||||
-- force local execution
|
||||
SELECT count(*) FROM upsert_test WHERE part_key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630513 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
-- multi-shard pushdown query that goes through local execution
|
||||
INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- multi-shard pull-to-coordinator query that goes through local execution
|
||||
INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test LIMIT 100 ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
COMMIT;
|
||||
-- to test citus local tables
|
||||
select undistribute_table('upsert_test');
|
||||
NOTICE: creating a new local table for single_node.upsert_test
|
||||
NOTICE: Moving the data of single_node.upsert_test
|
||||
NOTICE: Dropping the old single_node.upsert_test
|
||||
NOTICE: Renaming the new table to single_node.upsert_test
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- create citus local table
|
||||
select create_citus_local_table('upsert_test');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- test the constraint with local execution
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
DROP TABLE upsert_test;
|
||||
CREATE SCHEMA "Quoed.Schema";
|
||||
SET search_path TO "Quoed.Schema";
|
||||
CREATE TABLE "long_constraint_upsert\_test"
|
||||
(
|
||||
part_key int,
|
||||
other_col int,
|
||||
third_col int,
|
||||
CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" UNIQUE (part_key)
|
||||
);
|
||||
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
||||
-- distribute the table and create shards
|
||||
SELECT create_distributed_table('"long_constraint_upsert\_test"', 'part_key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO "long_constraint_upsert\_test" (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *;
|
||||
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
1 | 1 |
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE "long_constraint_upsert\_test" RENAME TO simple_table_name;
|
||||
INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *;
|
||||
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- this is currently not supported, but once we support
|
||||
-- make sure that the following query also works fine
|
||||
ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" TO simple_constraint_name;
|
||||
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
||||
ERROR: renaming constraints belonging to distributed tables is currently unsupported
|
||||
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
||||
SET search_path TO single_node;
|
||||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||
NOTICE: drop cascades to 5 other objects
|
||||
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630518
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630519
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630520
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630521
|
||||
-- we should be able to limit intermediate results
|
||||
BEGIN;
|
||||
SET LOCAL citus.max_intermediate_result_size TO 0;
|
||||
|
|
|
@ -41,6 +41,86 @@ SELECT * FROM test ORDER BY x;
|
|||
UPDATE test SET y = y + 1 RETURNING *;
|
||||
WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2;
|
||||
|
||||
-- Test upsert with constraint
|
||||
CREATE TABLE upsert_test
|
||||
(
|
||||
part_key int UNIQUE,
|
||||
other_col int,
|
||||
third_col int
|
||||
);
|
||||
|
||||
-- distribute the table
|
||||
SELECT create_distributed_table('upsert_test', 'part_key');
|
||||
|
||||
-- do a regular insert
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
|
||||
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
-- observe that there is a conflict and the following query does nothing
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
|
||||
|
||||
-- same as the above with different syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
|
||||
|
||||
-- again the same query with another syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
|
||||
BEGIN;
|
||||
|
||||
-- force local execution
|
||||
SELECT count(*) FROM upsert_test WHERE part_key = 1;
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
|
||||
-- multi-shard pushdown query that goes through local execution
|
||||
INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
|
||||
-- multi-shard pull-to-coordinator query that goes through local execution
|
||||
|
||||
INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test LIMIT 100 ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
|
||||
COMMIT;
|
||||
|
||||
-- to test citus local tables
|
||||
select undistribute_table('upsert_test');
|
||||
-- create citus local table
|
||||
select create_citus_local_table('upsert_test');
|
||||
-- test the constraint with local execution
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
|
||||
DROP TABLE upsert_test;
|
||||
|
||||
CREATE SCHEMA "Quoed.Schema";
|
||||
SET search_path TO "Quoed.Schema";
|
||||
|
||||
|
||||
CREATE TABLE "long_constraint_upsert\_test"
|
||||
(
|
||||
part_key int,
|
||||
other_col int,
|
||||
third_col int,
|
||||
|
||||
CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" UNIQUE (part_key)
|
||||
);
|
||||
-- distribute the table and create shards
|
||||
SELECT create_distributed_table('"long_constraint_upsert\_test"', 'part_key');
|
||||
|
||||
|
||||
INSERT INTO "long_constraint_upsert\_test" (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *;
|
||||
|
||||
ALTER TABLE "long_constraint_upsert\_test" RENAME TO simple_table_name;
|
||||
|
||||
INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *;
|
||||
|
||||
-- this is currently not supported, but once we support
|
||||
-- make sure that the following query also works fine
|
||||
ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" TO simple_constraint_name;
|
||||
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
||||
|
||||
SET search_path TO single_node;
|
||||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||
|
||||
-- we should be able to limit intermediate results
|
||||
BEGIN;
|
||||
SET LOCAL citus.max_intermediate_result_size TO 0;
|
||||
|
|
Loading…
Reference in New Issue