mirror of https://github.com/citusdata/citus.git
Plan reference/local table joins locally
parent
4230b96247
commit
d9dcba25e3
|
@ -20,6 +20,7 @@
|
|||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/insert_select_executor.h"
|
||||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_master_planner.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
|
@ -28,6 +29,7 @@
|
|||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "executor/execdebug.h"
|
||||
#include "commands/copy.h"
|
||||
|
@ -60,6 +62,7 @@ static bool IsCitusPlan(Plan *plan);
|
|||
static bool IsCitusCustomScan(Plan *plan);
|
||||
static Relation StubRelation(TupleDesc tupleDescriptor);
|
||||
static bool AlterTableConstraintCheck(QueryDesc *queryDesc);
|
||||
static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan);
|
||||
|
||||
/*
|
||||
* CitusExecutorStart is the ExecutorStart_hook that gets called when
|
||||
|
@ -70,6 +73,23 @@ CitusExecutorStart(QueryDesc *queryDesc, int eflags)
|
|||
{
|
||||
PlannedStmt *plannedStmt = queryDesc->plannedstmt;
|
||||
|
||||
if (CitusHasBeenLoaded())
|
||||
{
|
||||
if (IsLocalReferenceTableJoinPlan(plannedStmt) &&
|
||||
IsMultiStatementTransaction())
|
||||
{
|
||||
/*
|
||||
* Currently we don't support this to avoid problems with tuple
|
||||
* visibility, locking, etc. For example, change to the reference
|
||||
* table can go through a MultiConnection, which won't be visible
|
||||
* to the locally planned queries.
|
||||
*/
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot join local tables and reference tables in "
|
||||
"a transaction block")));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We cannot modify XactReadOnly on Windows because it is not
|
||||
* declared with PGDLLIMPORT.
|
||||
|
@ -631,3 +651,79 @@ AlterTableConstraintCheck(QueryDesc *queryDesc)
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsLocalReferenceTableJoinPlan returns true if the given plan joins local tables
|
||||
* with reference table shards.
|
||||
*
|
||||
* This should be consistent with IsLocalReferenceTableJoin() in distributed_planner.c.
|
||||
*/
|
||||
static bool
|
||||
IsLocalReferenceTableJoinPlan(PlannedStmt *plan)
|
||||
{
|
||||
bool hasReferenceTable = false;
|
||||
bool hasLocalTable = false;
|
||||
ListCell *oidCell = NULL;
|
||||
bool hasReferenceTableReplica = false;
|
||||
|
||||
/*
|
||||
* We only allow join between reference tables and local tables in the
|
||||
* coordinator.
|
||||
*/
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* All groups that have pg_dist_node entries, also have reference
|
||||
* table replicas.
|
||||
*/
|
||||
PrimaryNodeForGroup(GetLocalGroupId(), &hasReferenceTableReplica);
|
||||
|
||||
/*
|
||||
* If reference table doesn't have replicas on the coordinator, we don't
|
||||
* allow joins with local tables.
|
||||
*/
|
||||
if (!hasReferenceTableReplica)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* No need to check FOR UPDATE/SHARE or modifying subqueries, those have
|
||||
* already errored out in distributed_planner.c if they contain mix of
|
||||
* local and distributed tables.
|
||||
*/
|
||||
if (plan->commandType != CMD_SELECT)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach(oidCell, plan->relationOids)
|
||||
{
|
||||
Oid relationId = lfirst_oid(oidCell);
|
||||
bool onlySearchPath = false;
|
||||
|
||||
if (RelationIsAKnownShard(relationId, onlySearchPath))
|
||||
{
|
||||
/*
|
||||
* We don't allow joining non-reference distributed tables, so we
|
||||
* can skip checking that this is a reference table shard or not.
|
||||
*/
|
||||
hasReferenceTable = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
hasLocalTable = true;
|
||||
}
|
||||
|
||||
if (hasReferenceTable && hasLocalTable)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "distributed/function_call_delegation.h"
|
||||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/intermediate_results.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
|
@ -33,6 +34,7 @@
|
|||
#include "distributed/query_utils.h"
|
||||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "executor/executor.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
|
@ -96,7 +98,9 @@ static void PopPlannerRestrictionContext(void);
|
|||
static void ResetPlannerRestrictionContext(
|
||||
PlannerRestrictionContext *plannerRestrictionContext);
|
||||
static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
|
||||
|
||||
static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
|
||||
static bool QueryIsNotSimpleSelect(Node *node);
|
||||
static bool UpdateReferenceTablesWithShard(Node *node, void *context);
|
||||
|
||||
/* Distributed planner hook */
|
||||
PlannedStmt *
|
||||
|
@ -118,7 +122,20 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
}
|
||||
else if (CitusHasBeenLoaded())
|
||||
{
|
||||
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
|
||||
if (IsLocalReferenceTableJoin(parse, rangeTableList))
|
||||
{
|
||||
/*
|
||||
* For joins between reference tables and local tables, we replace
|
||||
* reference table names with shard tables names in the query, so
|
||||
* we can use the standard_planner for planning it locally.
|
||||
*/
|
||||
needsDistributedPlanning = false;
|
||||
UpdateReferenceTablesWithShard((Node *) parse, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
|
||||
}
|
||||
}
|
||||
|
||||
if (needsDistributedPlanning)
|
||||
|
@ -1770,3 +1787,172 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
|
|||
boundParams);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsLocalReferenceTableJoin returns if the given query is a join between
|
||||
* reference tables and local tables.
|
||||
*/
|
||||
static bool
|
||||
IsLocalReferenceTableJoin(Query *parse, List *rangeTableList)
|
||||
{
|
||||
bool hasReferenceTable = false;
|
||||
bool hasLocalTable = false;
|
||||
ListCell *rangeTableCell = false;
|
||||
|
||||
bool hasReferenceTableReplica = false;
|
||||
|
||||
/*
|
||||
* We only allow join between reference tables and local tables in the
|
||||
* coordinator.
|
||||
*/
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* All groups that have pg_dist_node entries, also have reference
|
||||
* table replicas.
|
||||
*/
|
||||
PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &hasReferenceTableReplica);
|
||||
|
||||
/*
|
||||
* If reference table doesn't have replicas on the coordinator, we don't
|
||||
* allow joins with local tables.
|
||||
*/
|
||||
if (!hasReferenceTableReplica)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (FindNodeCheck((Node *) parse, QueryIsNotSimpleSelect))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
|
||||
if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (!IsDistributedTable(rangeTableEntry->relid))
|
||||
{
|
||||
hasLocalTable = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
cacheEntry = DistributedTableCacheEntry(rangeTableEntry->relid);
|
||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
hasReferenceTable = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return hasLocalTable && hasReferenceTable;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* QueryIsNotSimpleSelect returns true if node is a query which modifies or
|
||||
* marks for modifications.
|
||||
*/
|
||||
static bool
|
||||
QueryIsNotSimpleSelect(Node *node)
|
||||
{
|
||||
Query *query = NULL;
|
||||
|
||||
if (!IsA(node, Query))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
query = (Query *) node;
|
||||
return (query->commandType != CMD_SELECT) || (query->rowMarks != NIL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateReferenceTablesWithShard recursively replaces the reference table names
|
||||
* in the given query with the shard table names.
|
||||
*/
|
||||
static bool
|
||||
UpdateReferenceTablesWithShard(Node *node, void *context)
|
||||
{
|
||||
RangeTblEntry *newRte = NULL;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
Oid relationId = InvalidOid;
|
||||
Oid schemaId = InvalidOid;
|
||||
char *relationName = NULL;
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* want to look at all RTEs, even in subqueries, CTEs and such */
|
||||
if (IsA(node, Query))
|
||||
{
|
||||
return query_tree_walker((Query *) node, UpdateReferenceTablesWithShard,
|
||||
NULL, QTW_EXAMINE_RTES_BEFORE);
|
||||
}
|
||||
|
||||
if (!IsA(node, RangeTblEntry))
|
||||
{
|
||||
return expression_tree_walker(node, UpdateReferenceTablesWithShard,
|
||||
NULL);
|
||||
}
|
||||
|
||||
newRte = (RangeTblEntry *) node;
|
||||
|
||||
if (newRte->rtekind != RTE_RELATION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
relationId = newRte->relid;
|
||||
if (!IsDistributedTable(relationId))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
||||
shardId = shardInterval->shardId;
|
||||
|
||||
relationName = get_rel_name(relationId);
|
||||
AppendShardIdToName(&relationName, shardId);
|
||||
|
||||
schemaId = get_rel_namespace(relationId);
|
||||
newRte->relid = get_relname_relid(relationName, schemaId);
|
||||
|
||||
/*
|
||||
* Parser locks relations in addRangeTableEntry(). So we should lock the
|
||||
* modified ones too.
|
||||
*/
|
||||
LockRelationOid(newRte->relid, AccessShareLock);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
/* Config variable managed via guc.c */
|
||||
bool OverrideTableVisibility = true;
|
||||
|
||||
static bool RelationIsAKnownShard(Oid shardRelationId);
|
||||
static bool ReplaceTableVisibleFunctionWalker(Node *inputNode);
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_table_is_visible);
|
||||
|
@ -39,10 +38,11 @@ Datum
|
|||
relation_is_a_known_shard(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool onlySearchPath = true;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_RETURN_BOOL(RelationIsAKnownShard(relationId));
|
||||
PG_RETURN_BOOL(RelationIsAKnownShard(relationId, onlySearchPath));
|
||||
}
|
||||
|
||||
|
||||
|
@ -56,6 +56,7 @@ citus_table_is_visible(PG_FUNCTION_ARGS)
|
|||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
char relKind = '\0';
|
||||
bool onlySearchPath = true;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
|
@ -68,7 +69,7 @@ citus_table_is_visible(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
if (RelationIsAKnownShard(relationId))
|
||||
if (RelationIsAKnownShard(relationId, onlySearchPath))
|
||||
{
|
||||
/*
|
||||
* If the input relation is an index we simply replace the
|
||||
|
@ -97,13 +98,14 @@ citus_table_is_visible(PG_FUNCTION_ARGS)
|
|||
|
||||
/*
|
||||
* RelationIsAKnownShard gets a relationId, check whether it's a shard of
|
||||
* any distributed table in the current search path.
|
||||
* any distributed table. If onlySearchPath is true, then it searches
|
||||
* the current search path.
|
||||
*
|
||||
* We can only do that in MX since both the metadata and tables are only
|
||||
* present there.
|
||||
*/
|
||||
static bool
|
||||
RelationIsAKnownShard(Oid shardRelationId)
|
||||
bool
|
||||
RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath)
|
||||
{
|
||||
int localGroupId = -1;
|
||||
char *shardRelationName = NULL;
|
||||
|
@ -112,6 +114,7 @@ RelationIsAKnownShard(Oid shardRelationId)
|
|||
uint64 shardId = INVALID_SHARD_ID;
|
||||
Oid relationId = InvalidOid;
|
||||
char relKind = '\0';
|
||||
Relation relation = NULL;
|
||||
|
||||
if (!OidIsValid(shardRelationId))
|
||||
{
|
||||
|
@ -136,8 +139,15 @@ RelationIsAKnownShard(Oid shardRelationId)
|
|||
}
|
||||
}
|
||||
|
||||
relation = try_relation_open(shardRelationId, AccessShareLock);
|
||||
if (relation == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
/* we're not interested in the relations that are not in the search path */
|
||||
if (!RelationIsVisible(shardRelationId))
|
||||
if (!RelationIsVisible(shardRelationId) && onlySearchPath)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -174,6 +184,12 @@ RelationIsAKnownShard(Oid shardRelationId)
|
|||
return false;
|
||||
}
|
||||
|
||||
/* verify that their namespaces are the same */
|
||||
if (get_rel_namespace(shardRelationId) != get_rel_namespace(relationId))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Now get the relation name and append the shardId to it. We need
|
||||
* to do that because otherwise a local table with a valid shardId
|
||||
|
|
|
@ -213,7 +213,6 @@ extern List * TableEntryList(List *rangeTableList);
|
|||
extern List * UsedTableEntryList(Query *query);
|
||||
extern List * pull_var_clause_default(Node *node);
|
||||
extern bool OperatorImplementsEquality(Oid opno);
|
||||
extern bool FindNodeCheck(Node *node, bool (*check)(Node *));
|
||||
extern DeferredErrorMessage * DeferErrorIfUnsupportedClause(List *clauseList);
|
||||
extern MultiProject * MultiProjectNode(List *targetEntryList);
|
||||
extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree);
|
||||
|
|
|
@ -17,6 +17,7 @@ extern bool OverrideTableVisibility;
|
|||
|
||||
|
||||
extern void ReplaceTableVisibleFunction(Node *inputNode);
|
||||
extern bool RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath);
|
||||
|
||||
|
||||
#endif /* WORKER_SHARD_VISIBILITY_H */
|
||||
|
|
|
@ -85,12 +85,87 @@ SELECT * FROM numbers ORDER BY a;
|
|||
|
||||
ROLLBACK;
|
||||
-- Make sure we hide shard tables ...
|
||||
SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
|
||||
SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
|
||||
citus_table_is_visible
|
||||
------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- Join between reference tables and local tables
|
||||
CREATE TABLE local_table(a int);
|
||||
INSERT INTO local_table VALUES (2), (4), (7), (20);
|
||||
EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------------------
|
||||
Merge Join (cost=359.57..860.00 rows=32512 width=8)
|
||||
Merge Cond: (local_table.a = numbers_8000001.a)
|
||||
-> Sort (cost=179.78..186.16 rows=2550 width=4)
|
||||
Sort Key: local_table.a
|
||||
-> Seq Scan on local_table (cost=0.00..35.50 rows=2550 width=4)
|
||||
-> Sort (cost=179.78..186.16 rows=2550 width=4)
|
||||
Sort Key: numbers_8000001.a
|
||||
-> Seq Scan on numbers_8000001 (cost=0.00..35.50 rows=2550 width=4)
|
||||
(8 rows)
|
||||
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
|
||||
a | a
|
||||
----+----
|
||||
20 | 20
|
||||
(1 row)
|
||||
|
||||
-- error if in transaction block
|
||||
BEGIN;
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
|
||||
ERROR: cannot join local tables and reference tables in a transaction block
|
||||
ROLLBACK;
|
||||
-- error if in a transaction block even if reference table is not in search path
|
||||
CREATE SCHEMA s1;
|
||||
CREATE TABLE s1.ref(a int);
|
||||
SELECT create_reference_table('s1.ref');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
|
||||
ERROR: cannot join local tables and reference tables in a transaction block
|
||||
ROLLBACK;
|
||||
DROP SCHEMA s1 CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table s1.ref
|
||||
drop cascades to table s1.ref_8000002
|
||||
-- shouldn't plan locally if modifications happen in CTEs, ...
|
||||
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table;
|
||||
ERROR: relation local_table is not distributed
|
||||
WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table
|
||||
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
|
||||
ERROR: relation local_table is not distributed
|
||||
-- but this should be fine
|
||||
WITH t AS (SELECT *, random() x FROM numbers) SELECT * FROM numbers, local_table
|
||||
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
|
||||
a | a
|
||||
---+---
|
||||
(0 rows)
|
||||
|
||||
-- shouldn't plan locally even if distributed table is in CTE or subquery
|
||||
CREATE TABLE dist(a int);
|
||||
SELECT create_distributed_table('dist', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
WITH t AS (SELECT *, random() x FROM dist) SELECT * FROM numbers, local_table
|
||||
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
|
||||
ERROR: relation local_table is not distributed
|
||||
-- error if FOR UPDATE/FOR SHARE
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
|
||||
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
|
||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
|
||||
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
|
||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
||||
-- clean-up
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA replicate_ref_to_coordinator CASCADE;
|
||||
|
|
|
@ -28,7 +28,8 @@ test: multi_mx_copy_data multi_mx_router_planner
|
|||
test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10
|
||||
test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19
|
||||
test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7
|
||||
test: multi_mx_tpch_query7_nested multi_mx_ddl ch_bench_having_mx
|
||||
test: multi_mx_tpch_query7_nested multi_mx_ddl
|
||||
test: ch_bench_having_mx
|
||||
test: recursive_dml_queries_mx multi_mx_truncate_from_worker
|
||||
test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
|
||||
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
||||
|
|
|
@ -39,7 +39,49 @@ SELECT * FROM numbers ORDER BY a;
|
|||
ROLLBACK;
|
||||
|
||||
-- Make sure we hide shard tables ...
|
||||
SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
|
||||
SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
|
||||
|
||||
-- Join between reference tables and local tables
|
||||
CREATE TABLE local_table(a int);
|
||||
INSERT INTO local_table VALUES (2), (4), (7), (20);
|
||||
|
||||
EXPLAIN SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
|
||||
|
||||
-- error if in transaction block
|
||||
BEGIN;
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- error if in a transaction block even if reference table is not in search path
|
||||
CREATE SCHEMA s1;
|
||||
CREATE TABLE s1.ref(a int);
|
||||
SELECT create_reference_table('s1.ref');
|
||||
|
||||
BEGIN;
|
||||
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
DROP SCHEMA s1 CASCADE;
|
||||
|
||||
-- shouldn't plan locally if modifications happen in CTEs, ...
|
||||
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table;
|
||||
WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table
|
||||
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
|
||||
|
||||
-- but this should be fine
|
||||
WITH t AS (SELECT *, random() x FROM numbers) SELECT * FROM numbers, local_table
|
||||
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
|
||||
|
||||
-- shouldn't plan locally even if distributed table is in CTE or subquery
|
||||
CREATE TABLE dist(a int);
|
||||
SELECT create_distributed_table('dist', 'a');
|
||||
WITH t AS (SELECT *, random() x FROM dist) SELECT * FROM numbers, local_table
|
||||
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
|
||||
|
||||
-- error if FOR UPDATE/FOR SHARE
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
|
||||
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
|
||||
|
||||
-- clean-up
|
||||
SET client_min_messages TO ERROR;
|
||||
|
|
Loading…
Reference in New Issue