From a64dc8a72b514983fe18492400b3cc9052771f3e Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Thu, 19 Nov 2020 12:23:21 +0300 Subject: [PATCH] Fixes a bug preventing INSERT SELECT .. ON CONFLICT with a constraint name on local shards Separate search relation shard function Add tests --- .../distributed/planner/deparse_shard_query.c | 148 ++++++++++++------ src/test/regress/expected/single_node.out | 137 ++++++++++++++++ src/test/regress/sql/single_node.sql | 80 ++++++++++ 3 files changed, 321 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index c5abdaf66..052a89ba6 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -13,6 +13,8 @@ #include "c.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "catalog/pg_constraint.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" @@ -34,10 +36,13 @@ #include "storage/lock.h" #include "utils/lsyscache.h" #include "utils/rel.h" - +#include "utils/syscache.h" static void UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, Task *task); +static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, + OnConflictExpr *onConflict); +static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); @@ -203,9 +208,6 @@ bool UpdateRelationToShardNames(Node *node, List *relationShardList) { uint64 shardId = INVALID_SHARD_ID; - Oid relationId = InvalidOid; - ListCell *relationShardCell = NULL; - RelationShard *relationShard = NULL; if (node == NULL) { @@ -238,24 +240,8 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) return false; } - /* - * Search for the restrictions associated with the RTE. There better be - * some, otherwise this query wouldn't be eligible as a router query. - * - * FIXME: We should probably use a hashtable here, to do efficient - * lookup. - */ - foreach(relationShardCell, relationShardList) - { - relationShard = (RelationShard *) lfirst(relationShardCell); - - if (newRte->relid == relationShard->relationId) - { - break; - } - - relationShard = NULL; - } + RelationShard *relationShard = FindRelationShard(newRte->relid, + relationShardList); bool replaceRteWithNullValues = relationShard == NULL || relationShard->shardId == INVALID_SHARD_ID; @@ -266,7 +252,7 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) } shardId = relationShard->shardId; - relationId = relationShard->relationId; + Oid relationId = relationShard->relationId; char *relationName = get_rel_name(relationId); AppendShardIdToName(&relationName, shardId); @@ -300,6 +286,13 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) relationShardList, QTW_EXAMINE_RTES_BEFORE); } + if (IsA(node, OnConflictExpr)) + { + OnConflictExpr *onConflict = (OnConflictExpr *) node; + + return ReplaceRelationConstraintByShardConstraint(relationShardList, onConflict); + } + if (!IsA(node, RangeTblEntry)) { return expression_tree_walker(node, UpdateRelationsToLocalShardTables, @@ -313,27 +306,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) return false; } - /* - * Search for the restrictions associated with the RTE. There better be - * some, otherwise this query wouldn't be eligible as a router query. - * - * FIXME: We should probably use a hashtable here, to do efficient - * lookup. - */ - ListCell *relationShardCell = NULL; - RelationShard *relationShard = NULL; - - foreach(relationShardCell, relationShardList) - { - relationShard = (RelationShard *) lfirst(relationShardCell); - - if (newRte->relid == relationShard->relationId) - { - break; - } - - relationShard = NULL; - } + RelationShard *relationShard = FindRelationShard(newRte->relid, + relationShardList); /* the function should only be called with local shards */ if (relationShard == NULL) @@ -350,6 +324,92 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) } +/* + * ReplaceRelationConstraintByShardConstraint replaces given OnConflictExpr's + * constraint id with constraint id of the corresponding shard. + */ +static bool +ReplaceRelationConstraintByShardConstraint(List *relationShardList, + OnConflictExpr *onConflict) +{ + Oid constraintId = onConflict->constraint; + + if (!OidIsValid(constraintId)) + { + return false; + } + + Oid constraintRelationId = InvalidOid; + + HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintId)); + if (HeapTupleIsValid(heapTuple)) + { + Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(heapTuple); + + constraintRelationId = contup->conrelid; + ReleaseSysCache(heapTuple); + } + + /* + * We can return here without calling the walker function, since we know there + * will be no possible tables or constraints after this point, by the syntax. + */ + if (!OidIsValid(constraintRelationId)) + { + ereport(ERROR, (errmsg("Invalid relation id (%u) for constraint: %s", + constraintRelationId, get_constraint_name(constraintId)))); + } + + RelationShard *relationShard = FindRelationShard(constraintRelationId, + relationShardList); + + if (relationShard != NULL) + { + char *constraintName = get_constraint_name(constraintId); + + AppendShardIdToName(&constraintName, relationShard->shardId); + + Oid shardOid = GetTableLocalShardOid(relationShard->relationId, + relationShard->shardId); + + Oid shardConstraintId = get_relation_constraint_oid(shardOid, constraintName, + false); + + onConflict->constraint = shardConstraintId; + + return false; + } + + return true; +} + + +/* + * FindRelationShard finds the RelationShard for shard relation with + * given Oid if exists in given relationShardList. Otherwise, returns NULL. + */ +static RelationShard * +FindRelationShard(Oid inputRelationId, List *relationShardList) +{ + RelationShard *relationShard = NULL; + + /* + * Search for the restrictions associated with the RTE. There better be + * some, otherwise this query wouldn't be eligible as a router query. + * FIXME: We should probably use a hashtable here, to do efficient lookup. + */ + foreach_ptr(relationShard, relationShardList) + { + if (inputRelationId == relationShard->relationId) + { + return relationShard; + } + } + + return NULL; +} + + /* * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into * subquery RTE that returns no results. diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index c595983a1..0f68947c0 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -100,6 +100,143 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER 5 | 6 (5 rows) +-- Test upsert with constraint +CREATE TABLE upsert_test +( + part_key int UNIQUE, + other_col int, + third_col int +); +-- distribute the table +SELECT create_distributed_table('upsert_test', 'part_key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- do a regular insert +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *; + part_key | other_col | third_col +--------------------------------------------------------------------- + 1 | 1 | + 2 | 2 | +(2 rows) + +SET citus.log_remote_commands to true; +-- observe that there is a conflict and the following query does nothing +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *; +NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +-- same as the above with different syntax +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *; +NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +-- again the same query with another syntax +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; +NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630513 DO NOTHING RETURNING part_key, other_col, third_col + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +BEGIN; +-- force local execution +SELECT count(*) FROM upsert_test WHERE part_key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630513 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SET citus.log_remote_commands to false; +-- multi-shard pushdown query that goes through local execution +INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +-- multi-shard pull-to-coordinator query that goes through local execution +INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test LIMIT 100 ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +-- to test citus local tables +select undistribute_table('upsert_test'); +NOTICE: creating a new local table for single_node.upsert_test +NOTICE: Moving the data of single_node.upsert_test +NOTICE: Dropping the old single_node.upsert_test +NOTICE: Renaming the new table to single_node.upsert_test + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +-- create citus local table +select create_citus_local_table('upsert_test'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +-- test the constraint with local execution +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +DROP TABLE upsert_test; +CREATE SCHEMA "Quoed.Schema"; +SET search_path TO "Quoed.Schema"; +CREATE TABLE "long_constraint_upsert\_test" +( + part_key int, + other_col int, + third_col int, + CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" UNIQUE (part_key) +); +NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " +-- distribute the table and create shards +SELECT create_distributed_table('"long_constraint_upsert\_test"', 'part_key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO "long_constraint_upsert\_test" (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *; +NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " + part_key | other_col | third_col +--------------------------------------------------------------------- + 1 | 1 | +(1 row) + +ALTER TABLE "long_constraint_upsert\_test" RENAME TO simple_table_name; +INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *; +NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " + part_key | other_col | third_col +--------------------------------------------------------------------- +(0 rows) + +-- this is currently not supported, but once we support +-- make sure that the following query also works fine +ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" TO simple_constraint_name; +NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " +ERROR: renaming constraints belonging to distributed tables is currently unsupported +--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; +SET search_path TO single_node; +DROP SCHEMA "Quoed.Schema" CASCADE; +NOTICE: drop cascades to 5 other objects +DETAIL: drop cascades to table "Quoed.Schema".simple_table_name +drop cascades to table "Quoed.Schema".simple_table_name_90630518 +drop cascades to table "Quoed.Schema".simple_table_name_90630519 +drop cascades to table "Quoed.Schema".simple_table_name_90630520 +drop cascades to table "Quoed.Schema".simple_table_name_90630521 -- we should be able to limit intermediate results BEGIN; SET LOCAL citus.max_intermediate_result_size TO 0; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index c081236db..e977bf06c 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -41,6 +41,86 @@ SELECT * FROM test ORDER BY x; UPDATE test SET y = y + 1 RETURNING *; WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; +-- Test upsert with constraint +CREATE TABLE upsert_test +( + part_key int UNIQUE, + other_col int, + third_col int +); + +-- distribute the table +SELECT create_distributed_table('upsert_test', 'part_key'); + +-- do a regular insert +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *; + +SET citus.log_remote_commands to true; + +-- observe that there is a conflict and the following query does nothing +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *; + +-- same as the above with different syntax +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *; + +-- again the same query with another syntax +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + +BEGIN; + +-- force local execution +SELECT count(*) FROM upsert_test WHERE part_key = 1; + +SET citus.log_remote_commands to false; + +-- multi-shard pushdown query that goes through local execution +INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + +-- multi-shard pull-to-coordinator query that goes through local execution + +INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test LIMIT 100 ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + +COMMIT; + +-- to test citus local tables +select undistribute_table('upsert_test'); +-- create citus local table +select create_citus_local_table('upsert_test'); +-- test the constraint with local execution +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; + +DROP TABLE upsert_test; + +CREATE SCHEMA "Quoed.Schema"; +SET search_path TO "Quoed.Schema"; + + +CREATE TABLE "long_constraint_upsert\_test" +( + part_key int, + other_col int, + third_col int, + + CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" UNIQUE (part_key) +); +-- distribute the table and create shards +SELECT create_distributed_table('"long_constraint_upsert\_test"', 'part_key'); + + +INSERT INTO "long_constraint_upsert\_test" (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *; + +ALTER TABLE "long_constraint_upsert\_test" RENAME TO simple_table_name; + +INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *; + +-- this is currently not supported, but once we support +-- make sure that the following query also works fine +ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" TO simple_constraint_name; +--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; + +SET search_path TO single_node; +DROP SCHEMA "Quoed.Schema" CASCADE; + -- we should be able to limit intermediate results BEGIN; SET LOCAL citus.max_intermediate_result_size TO 0;