Merge branch 'main' into tenant-schema-isolation

tenant-schema-isolation-complete-view
Onur Tirtir 2023-10-24 14:27:45 +03:00 committed by GitHub
commit 6af8a51065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 99 additions and 104 deletions

View File

@ -245,6 +245,7 @@ CREATE TABLE country_codes (
country_code VARCHAR(3) PRIMARY KEY,
country_name VARCHAR(50)
);
SELECT create_reference_table('country_codes');
-- Reference Table: Order Status
CREATE TABLE order_status (
@ -269,14 +270,17 @@ The aim of this planner is to avoid relying on PostgreSQL's standard_planner() f
### Main C Functions Involved:
- `FastPathRouterPlan()`: The primary function for creating the fast-path query plan.
- `FastPathPlanner()`: The primary function for creating the fast-path query plan.
- `FastPathRouterQuery()`: Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause.
With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query"
```sql
-- Fetches the count of users born in the same year, but only
-- for a single country
-- for a single country, with a filter on the distribution column
-- Normally we have a single user with id = 15 because it's a PRIMARY KEY
-- this is just to demonstrate that fast-path can handle complex queries
-- with EXTRACT(), COUNT(), GROUP BY, HAVING, etc.
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)
FROM users_table
WHERE country_code = 'USA' AND user_id = 15
@ -382,11 +386,10 @@ FROM users_table u, orders_table o
WHERE u.user_id = o.user_id AND u.user_id = 42;
-- With Subqueries:
-- Fetch the username and their total order amount
-- for a specific user
SELECT u.username,
(SELECT MAX(o.product_id) FROM orders_table o
(SELECT COUNT(*) FROM orders_table o
WHERE o.user_id = 42 AND
o.user_id = u.user_id)
FROM users_table u
@ -692,7 +695,7 @@ Assume that there are two subqueries; each subquery is individually joined on th
-- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality.
-- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key.
-- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2.
SELECT a.user_id, b.user_id
SELECT sub1.user_id, sub2.user_id
FROM (
SELECT u.user_id
FROM users_table u
@ -884,7 +887,7 @@ Citus has a rules-based optimizer. The core function `MultiLogicalPlanCreate()`
For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function `Commutative()` in `multi_logical_optimizer.c`.
The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollection` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively.
The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollect` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively.
##### Aggregate functions
@ -1034,8 +1037,8 @@ SELECT * FROM cte_1;
-- but as the same cte used twice
-- Citus converts the CTE to intermediate result
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1 as c1 JOIN
cte_1 as c2 USING (user_id);
SELECT * FROM cte_1 as c1
JOIN cte_1 as c2 USING (user_id);
```
- **Citus Specific Materialization**:
@ -1051,8 +1054,7 @@ As of writing this document, Citus does NOT support
```sql
WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
SELECT
max(date_of_birth)
SELECT max(date_of_birth)
FROM users_that_have_orders
GROUP BY GROUPING SETS (user_id, email);
...
@ -1099,7 +1101,7 @@ INSERT INTO orders_table (order_id, user_id) VALUES
```
**Debug Info**:
Debug information shows how the query is rebuilt for different user_ids.
Debug information shows how the query is rebuilt for different user_ids. Here, the shard_count is 4.
```sql
-- for user_id: 1
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)
@ -1133,7 +1135,7 @@ DEBUG: query after rebuilding: INSERT INTO public.orders_table_102064 AS citus
**Examples**:
The following section will delve into examples, starting with simple ones and moving to more complex scenarios.
### INSERT.. SELECT Advanced Scenarios
### INSERT.. SELECT Query Planning
**Overview**:
The `INSERT .. SELECT` pushdown logic builds upon the pushdown planning for `SELECT` commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are `CreateDistributedInsertSelectPlan`, `DistributedInsertSelectSupported()`, and `AllDistributionKeysInQueryAreEqual`.
@ -1267,7 +1269,7 @@ WHERE user_id IN (SELECT user_id FROM high_value_users);
Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively.
```sql
DELETE FROM users_table WHERE user_id
IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5);
IN (SELECT user_id FROM orders_table WHERE order_date < '2023-01-01' ORDER BY order_date LIMIT 5);
```
### Correlated/Lateral Subqueries in Planning
@ -1279,8 +1281,7 @@ Correlated or LATERAL subqueries have special behavior in Citus. They can often
**Key Code Details**:
For more information on the code, check the following functions:
`DeferErrorIfCannotPushdownSubquery()` ->
`ContainsReferencesToOuterQuery()` ->
`DeferErrorIfSubqueryRequiresMerge()`.
`ContainsReferencesToOuterQuery()`, `DeferErrorIfSubqueryRequiresMerge()`, `DeferredErrorIfUnsupportedLateralSubquery()`. LATERAL queries are different/unique: even if the subquery requires a merge step such as a `LIMIT`, if the correlation is on the distribution column, we can push it down. See [#4385](https://github.com/citusdata/citus/pull/4385).
@ -1409,7 +1410,7 @@ WITH recent_orders AS (
)
SELECT u.*
FROM users_table u
JOIN recent_orders o ON u.user_id = o.product_id;
JOIN recent_orders o ON u.user_id = o.product_id
JOIN orders_table o2 ON o2.product_id = o.product_id;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
```

View File

@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt)
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransction = indexStmt->concurrent;
task->cannotBeExecutedInTransaction = indexStmt->concurrent;
taskList = lappend(taskList, task);
@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransction =
task->cannotBeExecutedInTransaction =
IsReindexWithParam_compat(reindexStmt, "concurrently");
taskList = lappend(taskList, task);
@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransction = dropStmt->concurrent;
task->cannotBeExecutedInTransaction = dropStmt->concurrent;
taskList = lappend(taskList, task);

View File

@ -95,13 +95,13 @@ int UtilityHookLevel = 0;
/* Local functions forward declarations for helper functions */
static void ProcessUtilityInternal(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *completionTag);
static void citus_ProcessUtilityInternal(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *completionTag);
static void set_indexsafe_procflags(void);
static char * CurrentSearchPath(void);
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
@ -130,7 +130,7 @@ ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityConte
/*
* multi_ProcessUtility is the main entry hook for implementing Citus-specific
* citus_ProcessUtility is the main entry hook for implementing Citus-specific
* utility behavior. Its primary responsibilities are intercepting COPY and DDL
* commands and augmenting the coordinator's command with corresponding tasks
* to be run on worker nodes, after suitably ensuring said commands' options
@ -139,7 +139,7 @@ ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityConte
* TRUNCATE and VACUUM are also supported.
*/
void
multi_ProcessUtility(PlannedStmt *pstmt,
citus_ProcessUtility(PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
@ -329,8 +329,8 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PG_TRY();
{
ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
completionTag);
citus_ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
completionTag);
if (UtilityHookLevel == 1)
{
@ -404,7 +404,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
/*
* ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority
* citus_ProcessUtilityInternal is a helper function for citus_ProcessUtility where majority
* of the Citus specific utility statements are handled here. The distinction between
* both functions is that Citus_ProcessUtility does not handle CALL and DO statements.
* The reason for the distinction is implemented to be able to find the "top-level" DDL
@ -412,13 +412,13 @@ multi_ProcessUtility(PlannedStmt *pstmt,
* this goal.
*/
static void
ProcessUtilityInternal(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *completionTag)
citus_ProcessUtilityInternal(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *completionTag)
{
Node *parsetree = pstmt->utilityStmt;
List *ddlJobs = NIL;
@ -1386,7 +1386,7 @@ PostStandardProcessUtility(Node *parsetree)
* on the local table first. However, in order to decide whether the
* command leads to an invalidation, we need to check before the command
* is being executed since we read pg_constraint table. Thus, we maintain a
* local flag and do the invalidation after multi_ProcessUtility,
* local flag and do the invalidation after citus_ProcessUtility,
* before ExecuteDistributedDDLJob().
*/
InvalidateForeignKeyGraphForDDL();

View File

@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM);
taskList = lappend(taskList, task);
}
@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa
SetTaskQueryStringList(task, unqualifiedVacuumCommands);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM);
bool hasPeerWorker = false;

View File

@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList)
}
Task *task = (Task *) linitial(taskList);
if (task->cannotBeExecutedInTransction)
if (task->cannotBeExecutedInTransaction)
{
/* vacuum, create index concurrently etc. */
return false;
@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList)
Task *task = NULL;
foreach_ptr(task, taskList)
{
if (task->cannotBeExecutedInTransction)
if (task->cannotBeExecutedInTransaction)
{
return true;
}

View File

@ -521,8 +521,7 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
/*
* HasDistributionKey returs true if given Citus table doesn't have a
* distribution key.
* HasDistributionKey returns true if given Citus table has a distribution key.
*/
bool
HasDistributionKey(Oid relationId)
@ -538,8 +537,8 @@ HasDistributionKey(Oid relationId)
/*
* HasDistributionKey returs true if given cache entry identifies a Citus
* table that doesn't have a distribution key.
* HasDistributionKeyCacheEntry returns true if given cache entry identifies a
* Citus table that has a distribution key.
*/
bool
HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry)

View File

@ -154,7 +154,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
* being a fast path router query.
* The requirements for the fast path query can be listed below:
*
* - SELECT query without CTES, sublinks-subqueries, set operations
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
* - The query should touch only a single hash distributed or reference table
* - The distribution with equality operator should be in the WHERE clause
* and it should be ANDed with any other filters. Also, the distribution

View File

@ -2324,27 +2324,11 @@ PlanRouterQuery(Query *originalQuery,
TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery,
distributionKeyValue,
partitionValueConst);
/*
* This could only happen when there is a parameter on the distribution key.
* We defer error here, later the planner is forced to use a generic plan
* by assigning arbitrarily high cost to the plan.
*/
if (UpdateOrDeleteOrMergeQuery(originalQuery) && isMultiShardQuery)
{
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Router planner cannot handle multi-shard "
"modify queries", NULL, NULL);
return planningError;
}
Assert(!isMultiShardQuery);
*prunedShardIntervalListList = shardIntervalList;
if (!isMultiShardQuery)
{
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
"query")));
}
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
"query")));
}
else
{

View File

@ -543,7 +543,7 @@ _PG_init(void)
*/
PrevProcessUtility = (ProcessUtility_hook != NULL) ?
ProcessUtility_hook : standard_ProcessUtility;
ProcessUtility_hook = multi_ProcessUtility;
ProcessUtility_hook = citus_ProcessUtility;
/*
* Acquire symbols for columnar functions that citus calls.

View File

@ -326,7 +326,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
COPY_SCALAR_FIELD(isLocalTableModification);
COPY_SCALAR_FIELD(cannotBeExecutedInTransction);
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction);
}

View File

@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
WRITE_BOOL_FIELD(isLocalTableModification);
WRITE_BOOL_FIELD(cannotBeExecutedInTransction);
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
}

View File

@ -78,7 +78,7 @@ typedef struct DDLJob
extern ProcessUtility_hook_type PrevProcessUtility;
extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
extern void citus_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context, ParamListInfo params,
struct QueryEnvironment *queryEnv, DestReceiver *dest,

View File

@ -329,7 +329,7 @@ typedef struct Task
/*
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
*/
bool cannotBeExecutedInTransction;
bool cannotBeExecutedInTransaction;
Const *partitionKeyValue;
int colocationId;

View File

@ -178,32 +178,31 @@ SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_table_1_
CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar;
-- reserve some chunks and a stripe
INSERT INTO columnar_temp SELECT i FROM generate_series(1,5) i;
SELECT columnar.get_storage_id(oid) AS columnar_temp_storage_id
FROM pg_class WHERE relname='columnar_temp' \gset
SELECT pg_backend_pid() AS val INTO old_backend_pid;
SELECT columnar.get_storage_id(oid) as oid INTO columnar_temp_storage_id
FROM pg_class WHERE relname='columnar_temp';
\c - - - :master_port
SET search_path TO columnar_create;
-- wait until old backend to expire to make sure that temp table cleanup is complete
SELECT columnar_test_helpers.pg_waitpid(val) FROM old_backend_pid;
pg_waitpid
---------------------------------------------------------------------
-- wait until temporary table and its metadata is removed
DO $$
DECLARE
loop_wait_count integer := 0;
BEGIN
WHILE (
(SELECT COUNT(*) > 0 FROM pg_class WHERE relname='columnar_temp') OR
(SELECT columnar_test_helpers.columnar_metadata_has_storage_id(oid) FROM columnar_temp_storage_id)
)
LOOP
IF loop_wait_count > 1000 THEN
RAISE EXCEPTION 'Timeout while waiting for temporary table to be dropped';
END IF;
(1 row)
DROP TABLE old_backend_pid;
-- show that temporary table itself and its metadata is removed
SELECT COUNT(*)=0 FROM pg_class WHERE relname='columnar_temp';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_storage_id);
columnar_metadata_has_storage_id
---------------------------------------------------------------------
f
(1 row)
PERFORM pg_sleep(0.001);
loop_wait_count := loop_wait_count + 1;
END LOOP;
END;
$$ language plpgsql;
DROP TABLE columnar_temp_storage_id;
-- connect to another session and create a temp table with same name
CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar;
-- reserve some chunks and a stripe

View File

@ -136,22 +136,34 @@ CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar;
-- reserve some chunks and a stripe
INSERT INTO columnar_temp SELECT i FROM generate_series(1,5) i;
SELECT columnar.get_storage_id(oid) AS columnar_temp_storage_id
FROM pg_class WHERE relname='columnar_temp' \gset
SELECT pg_backend_pid() AS val INTO old_backend_pid;
SELECT columnar.get_storage_id(oid) as oid INTO columnar_temp_storage_id
FROM pg_class WHERE relname='columnar_temp';
\c - - - :master_port
SET search_path TO columnar_create;
-- wait until old backend to expire to make sure that temp table cleanup is complete
SELECT columnar_test_helpers.pg_waitpid(val) FROM old_backend_pid;
-- wait until temporary table and its metadata is removed
DO $$
DECLARE
loop_wait_count integer := 0;
BEGIN
WHILE (
(SELECT COUNT(*) > 0 FROM pg_class WHERE relname='columnar_temp') OR
(SELECT columnar_test_helpers.columnar_metadata_has_storage_id(oid) FROM columnar_temp_storage_id)
)
LOOP
IF loop_wait_count > 1000 THEN
RAISE EXCEPTION 'Timeout while waiting for temporary table to be dropped';
END IF;
DROP TABLE old_backend_pid;
PERFORM pg_sleep(0.001);
-- show that temporary table itself and its metadata is removed
SELECT COUNT(*)=0 FROM pg_class WHERE relname='columnar_temp';
SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_storage_id);
loop_wait_count := loop_wait_count + 1;
END LOOP;
END;
$$ language plpgsql;
DROP TABLE columnar_temp_storage_id;
-- connect to another session and create a temp table with same name
CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar;