From d9dcba25e3c80378b9215e28625ad9f85354eaf9 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 14 Nov 2019 12:53:24 -0800 Subject: [PATCH] Plan reference/local table joins locally --- .../distributed/executor/multi_executor.c | 96 +++++++++ .../distributed/planner/distributed_planner.c | 190 +++++++++++++++++- .../worker/worker_shard_visibility.c | 30 ++- .../distributed/multi_logical_planner.h | 1 - .../distributed/worker_shard_visibility.h | 1 + ...licate_reference_tables_to_coordinator.out | 77 ++++++- src/test/regress/multi_mx_schedule | 3 +- ...licate_reference_tables_to_coordinator.sql | 44 +++- 8 files changed, 429 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 76ab7b7dd..19006e196 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -20,6 +20,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" +#include "distributed/master_protocol.h" #include "distributed/multi_executor.h" #include "distributed/multi_master_planner.h" #include "distributed/distributed_planner.h" @@ -28,6 +29,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" +#include "distributed/worker_shard_visibility.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" #include "commands/copy.h" @@ -60,6 +62,7 @@ static bool IsCitusPlan(Plan *plan); static bool IsCitusCustomScan(Plan *plan); static Relation StubRelation(TupleDesc tupleDescriptor); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); +static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan); /* * CitusExecutorStart is the ExecutorStart_hook that gets called when @@ -70,6 +73,23 @@ CitusExecutorStart(QueryDesc *queryDesc, int eflags) { PlannedStmt *plannedStmt = queryDesc->plannedstmt; + if (CitusHasBeenLoaded()) + { + if (IsLocalReferenceTableJoinPlan(plannedStmt) && + IsMultiStatementTransaction()) + { + /* + * Currently we don't support this to avoid problems with tuple + * visibility, locking, etc. For example, change to the reference + * table can go through a MultiConnection, which won't be visible + * to the locally planned queries. + */ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot join local tables and reference tables in " + "a transaction block"))); + } + } + /* * We cannot modify XactReadOnly on Windows because it is not * declared with PGDLLIMPORT. @@ -631,3 +651,79 @@ AlterTableConstraintCheck(QueryDesc *queryDesc) return true; } + + +/* + * IsLocalReferenceTableJoinPlan returns true if the given plan joins local tables + * with reference table shards. + * + * This should be consistent with IsLocalReferenceTableJoin() in distributed_planner.c. + */ +static bool +IsLocalReferenceTableJoinPlan(PlannedStmt *plan) +{ + bool hasReferenceTable = false; + bool hasLocalTable = false; + ListCell *oidCell = NULL; + bool hasReferenceTableReplica = false; + + /* + * We only allow join between reference tables and local tables in the + * coordinator. + */ + if (!IsCoordinator()) + { + return false; + } + + /* + * All groups that have pg_dist_node entries, also have reference + * table replicas. + */ + PrimaryNodeForGroup(GetLocalGroupId(), &hasReferenceTableReplica); + + /* + * If reference table doesn't have replicas on the coordinator, we don't + * allow joins with local tables. + */ + if (!hasReferenceTableReplica) + { + return false; + } + + /* + * No need to check FOR UPDATE/SHARE or modifying subqueries, those have + * already errored out in distributed_planner.c if they contain mix of + * local and distributed tables. + */ + if (plan->commandType != CMD_SELECT) + { + return false; + } + + foreach(oidCell, plan->relationOids) + { + Oid relationId = lfirst_oid(oidCell); + bool onlySearchPath = false; + + if (RelationIsAKnownShard(relationId, onlySearchPath)) + { + /* + * We don't allow joining non-reference distributed tables, so we + * can skip checking that this is a reference table shard or not. + */ + hasReferenceTable = true; + } + else + { + hasLocalTable = true; + } + + if (hasReferenceTable && hasLocalTable) + { + return true; + } + } + + return false; +} diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index e97ed1c3d..f592d851e 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -21,6 +21,7 @@ #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" #include "distributed/intermediate_results.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/distributed_planner.h" @@ -33,6 +34,7 @@ #include "distributed/query_utils.h" #include "distributed/recursive_planning.h" #include "distributed/shardinterval_utils.h" +#include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -96,7 +98,9 @@ static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); - +static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); +static bool QueryIsNotSimpleSelect(Node *node); +static bool UpdateReferenceTablesWithShard(Node *node, void *context); /* Distributed planner hook */ PlannedStmt * @@ -118,7 +122,20 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } else if (CitusHasBeenLoaded()) { - needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + if (IsLocalReferenceTableJoin(parse, rangeTableList)) + { + /* + * For joins between reference tables and local tables, we replace + * reference table names with shard tables names in the query, so + * we can use the standard_planner for planning it locally. + */ + needsDistributedPlanning = false; + UpdateReferenceTablesWithShard((Node *) parse, NULL); + } + else + { + needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + } } if (needsDistributedPlanning) @@ -1770,3 +1787,172 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) boundParams); } } + + +/* + * IsLocalReferenceTableJoin returns if the given query is a join between + * reference tables and local tables. + */ +static bool +IsLocalReferenceTableJoin(Query *parse, List *rangeTableList) +{ + bool hasReferenceTable = false; + bool hasLocalTable = false; + ListCell *rangeTableCell = false; + + bool hasReferenceTableReplica = false; + + /* + * We only allow join between reference tables and local tables in the + * coordinator. + */ + if (!IsCoordinator()) + { + return false; + } + + /* + * All groups that have pg_dist_node entries, also have reference + * table replicas. + */ + PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &hasReferenceTableReplica); + + /* + * If reference table doesn't have replicas on the coordinator, we don't + * allow joins with local tables. + */ + if (!hasReferenceTableReplica) + { + return false; + } + + if (FindNodeCheck((Node *) parse, QueryIsNotSimpleSelect)) + { + return false; + } + + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + DistTableCacheEntry *cacheEntry = NULL; + + if (rangeTableEntry->rtekind == RTE_FUNCTION) + { + return false; + } + + if (rangeTableEntry->rtekind != RTE_RELATION) + { + continue; + } + + + if (!IsDistributedTable(rangeTableEntry->relid)) + { + hasLocalTable = true; + continue; + } + + cacheEntry = DistributedTableCacheEntry(rangeTableEntry->relid); + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + hasReferenceTable = true; + } + else + { + return false; + } + } + + return hasLocalTable && hasReferenceTable; +} + + +/* + * QueryIsNotSimpleSelect returns true if node is a query which modifies or + * marks for modifications. + */ +static bool +QueryIsNotSimpleSelect(Node *node) +{ + Query *query = NULL; + + if (!IsA(node, Query)) + { + return false; + } + + query = (Query *) node; + return (query->commandType != CMD_SELECT) || (query->rowMarks != NIL); +} + + +/* + * UpdateReferenceTablesWithShard recursively replaces the reference table names + * in the given query with the shard table names. + */ +static bool +UpdateReferenceTablesWithShard(Node *node, void *context) +{ + RangeTblEntry *newRte = NULL; + uint64 shardId = INVALID_SHARD_ID; + Oid relationId = InvalidOid; + Oid schemaId = InvalidOid; + char *relationName = NULL; + DistTableCacheEntry *cacheEntry = NULL; + ShardInterval *shardInterval = NULL; + + if (node == NULL) + { + return false; + } + + /* want to look at all RTEs, even in subqueries, CTEs and such */ + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, UpdateReferenceTablesWithShard, + NULL, QTW_EXAMINE_RTES_BEFORE); + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, UpdateReferenceTablesWithShard, + NULL); + } + + newRte = (RangeTblEntry *) node; + + if (newRte->rtekind != RTE_RELATION) + { + return false; + } + + relationId = newRte->relid; + if (!IsDistributedTable(relationId)) + { + return false; + } + + cacheEntry = DistributedTableCacheEntry(relationId); + if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE) + { + return false; + } + + shardInterval = cacheEntry->sortedShardIntervalArray[0]; + shardId = shardInterval->shardId; + + relationName = get_rel_name(relationId); + AppendShardIdToName(&relationName, shardId); + + schemaId = get_rel_namespace(relationId); + newRte->relid = get_relname_relid(relationName, schemaId); + + /* + * Parser locks relations in addRangeTableEntry(). So we should lock the + * modified ones too. + */ + LockRelationOid(newRte->relid, AccessShareLock); + + return false; +} diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index c79e1ba31..5762a89f7 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -23,7 +23,6 @@ /* Config variable managed via guc.c */ bool OverrideTableVisibility = true; -static bool RelationIsAKnownShard(Oid shardRelationId); static bool ReplaceTableVisibleFunctionWalker(Node *inputNode); PG_FUNCTION_INFO_V1(citus_table_is_visible); @@ -39,10 +38,11 @@ Datum relation_is_a_known_shard(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); + bool onlySearchPath = true; CheckCitusVersion(ERROR); - PG_RETURN_BOOL(RelationIsAKnownShard(relationId)); + PG_RETURN_BOOL(RelationIsAKnownShard(relationId, onlySearchPath)); } @@ -56,6 +56,7 @@ citus_table_is_visible(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); char relKind = '\0'; + bool onlySearchPath = true; CheckCitusVersion(ERROR); @@ -68,7 +69,7 @@ citus_table_is_visible(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } - if (RelationIsAKnownShard(relationId)) + if (RelationIsAKnownShard(relationId, onlySearchPath)) { /* * If the input relation is an index we simply replace the @@ -97,13 +98,14 @@ citus_table_is_visible(PG_FUNCTION_ARGS) /* * RelationIsAKnownShard gets a relationId, check whether it's a shard of - * any distributed table in the current search path. + * any distributed table. If onlySearchPath is true, then it searches + * the current search path. * * We can only do that in MX since both the metadata and tables are only * present there. */ -static bool -RelationIsAKnownShard(Oid shardRelationId) +bool +RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath) { int localGroupId = -1; char *shardRelationName = NULL; @@ -112,6 +114,7 @@ RelationIsAKnownShard(Oid shardRelationId) uint64 shardId = INVALID_SHARD_ID; Oid relationId = InvalidOid; char relKind = '\0'; + Relation relation = NULL; if (!OidIsValid(shardRelationId)) { @@ -136,8 +139,15 @@ RelationIsAKnownShard(Oid shardRelationId) } } + relation = try_relation_open(shardRelationId, AccessShareLock); + if (relation == NULL) + { + return false; + } + relation_close(relation, NoLock); + /* we're not interested in the relations that are not in the search path */ - if (!RelationIsVisible(shardRelationId)) + if (!RelationIsVisible(shardRelationId) && onlySearchPath) { return false; } @@ -174,6 +184,12 @@ RelationIsAKnownShard(Oid shardRelationId) return false; } + /* verify that their namespaces are the same */ + if (get_rel_namespace(shardRelationId) != get_rel_namespace(relationId)) + { + return false; + } + /* * Now get the relation name and append the shardId to it. We need * to do that because otherwise a local table with a valid shardId diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index a1ede5dab..33b54bef2 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -213,7 +213,6 @@ extern List * TableEntryList(List *rangeTableList); extern List * UsedTableEntryList(Query *query); extern List * pull_var_clause_default(Node *node); extern bool OperatorImplementsEquality(Oid opno); -extern bool FindNodeCheck(Node *node, bool (*check)(Node *)); extern DeferredErrorMessage * DeferErrorIfUnsupportedClause(List *clauseList); extern MultiProject * MultiProjectNode(List *targetEntryList); extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree); diff --git a/src/include/distributed/worker_shard_visibility.h b/src/include/distributed/worker_shard_visibility.h index b9dddc26a..871421eb7 100644 --- a/src/include/distributed/worker_shard_visibility.h +++ b/src/include/distributed/worker_shard_visibility.h @@ -17,6 +17,7 @@ extern bool OverrideTableVisibility; extern void ReplaceTableVisibleFunction(Node *inputNode); +extern bool RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath); #endif /* WORKER_SHARD_VISIBILITY_H */ diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index 3f46d4189..dca81da47 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -85,12 +85,87 @@ SELECT * FROM numbers ORDER BY a; ROLLBACK; -- Make sure we hide shard tables ... - SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); +SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); citus_table_is_visible ------------------------ f (1 row) +-- Join between reference tables and local tables +CREATE TABLE local_table(a int); +INSERT INTO local_table VALUES (2), (4), (7), (20); +EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; + QUERY PLAN +------------------------------------------------------------------------------- + Merge Join (cost=359.57..860.00 rows=32512 width=8) + Merge Cond: (local_table.a = numbers_8000001.a) + -> Sort (cost=179.78..186.16 rows=2550 width=4) + Sort Key: local_table.a + -> Seq Scan on local_table (cost=0.00..35.50 rows=2550 width=4) + -> Sort (cost=179.78..186.16 rows=2550 width=4) + Sort Key: numbers_8000001.a + -> Seq Scan on numbers_8000001 (cost=0.00..35.50 rows=2550 width=4) +(8 rows) + +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; + a | a +----+---- + 20 | 20 +(1 row) + +-- error if in transaction block +BEGIN; +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +ERROR: cannot join local tables and reference tables in a transaction block +ROLLBACK; +-- error if in a transaction block even if reference table is not in search path +CREATE SCHEMA s1; +CREATE TABLE s1.ref(a int); +SELECT create_reference_table('s1.ref'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; +ERROR: cannot join local tables and reference tables in a transaction block +ROLLBACK; +DROP SCHEMA s1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table s1.ref +drop cascades to table s1.ref_8000002 +-- shouldn't plan locally if modifications happen in CTEs, ... +WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; +ERROR: relation local_table is not distributed +WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table + WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); +ERROR: relation local_table is not distributed +-- but this should be fine +WITH t AS (SELECT *, random() x FROM numbers) SELECT * FROM numbers, local_table + WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); + a | a +---+--- +(0 rows) + +-- shouldn't plan locally even if distributed table is in CTE or subquery +CREATE TABLE dist(a int); +SELECT create_distributed_table('dist', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +WITH t AS (SELECT *, random() x FROM dist) SELECT * FROM numbers, local_table + WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); +ERROR: relation local_table is not distributed + -- error if FOR UPDATE/FOR SHARE + SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; +ERROR: could not run distributed query with FOR UPDATE/SHARE commands +HINT: Consider using an equality filter on the distributed table's partition column. + SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; +ERROR: could not run distributed query with FOR UPDATE/SHARE commands +HINT: Consider using an equality filter on the distributed table's partition column. -- clean-up SET client_min_messages TO ERROR; DROP SCHEMA replicate_ref_to_coordinator CASCADE; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index d59dd021c..80ccc76f7 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -28,7 +28,8 @@ test: multi_mx_copy_data multi_mx_router_planner test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 -test: multi_mx_tpch_query7_nested multi_mx_ddl ch_bench_having_mx +test: multi_mx_tpch_query7_nested multi_mx_ddl +test: ch_bench_having_mx test: recursive_dml_queries_mx multi_mx_truncate_from_worker test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 945cf3dfe..5a2af41cf 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -39,7 +39,49 @@ SELECT * FROM numbers ORDER BY a; ROLLBACK; -- Make sure we hide shard tables ... - SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); +SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); + +-- Join between reference tables and local tables +CREATE TABLE local_table(a int); +INSERT INTO local_table VALUES (2), (4), (7), (20); + +EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; + +-- error if in transaction block +BEGIN; +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +ROLLBACK; + +-- error if in a transaction block even if reference table is not in search path +CREATE SCHEMA s1; +CREATE TABLE s1.ref(a int); +SELECT create_reference_table('s1.ref'); + +BEGIN; +SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; +ROLLBACK; + +DROP SCHEMA s1 CASCADE; + +-- shouldn't plan locally if modifications happen in CTEs, ... +WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; +WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table + WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); + +-- but this should be fine +WITH t AS (SELECT *, random() x FROM numbers) SELECT * FROM numbers, local_table + WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); + +-- shouldn't plan locally even if distributed table is in CTE or subquery +CREATE TABLE dist(a int); +SELECT create_distributed_table('dist', 'a'); +WITH t AS (SELECT *, random() x FROM dist) SELECT * FROM numbers, local_table + WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); + + -- error if FOR UPDATE/FOR SHARE + SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; + SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; -- clean-up SET client_min_messages TO ERROR;