Commit Graph

75 Commits (c8aba9b6df8fd1672df0ebd872f2f182bccae390)

Author SHA1 Message Date
Onur Tirtir 3d61c4dc71
Add citus_stat_counters view and citus_stat_counters_reset() function to reset it (#7917)
DESCRIPTION: Adds citus_stat_counters view that can be used to query
stat counters that Citus collects while the feature is enabled, which is
controlled by citus.enable_stat_counters. citus_stat_counters() can be
used to query the stat counters for the provided database oid and
citus_stat_counters_reset() can be used to reset them for the provided
database oid or for the current database if nothing or 0 is provided.

Today we don't persist stat counters on server shutdown. In other words,
stat counters are automatically reset in case of a server restart.

Details on the underlying design can be found in header comment of
stat_counters.c and in the technical readme.

-------

Here are the details about what we track as of this PR:

For connection management, we have three statistics about the inter-node
connections initiated by the node itself:

* **connection_establishment_succeeded**
* **connection_establishment_failed**
* **connection_reused**

While the first two are relatively easier to understand, the third one
covers the case where a connection is reused. This can happen when a
connection was already established to the desired node, Citus decided to
cache it for some time (see citus.max_cached_conns_per_worker &
citus.max_cached_connection_lifetime), and then reused it for a new
remote operation. Here are the other important details about these
connection statistics:

1. connection_establishment_failed doesn't care about the connections
that we could establish but are lost later in the transaction. Plus, we
cannot guarantee that the connections that are counted in
connection_establishment_succeeded were not lost later.
2. connection_establishment_failed doesn't care about the optional
connections (see OPTIONAL_CONNECTION flag) that we gave up establishing
because of the connection throttling rules we follow (see
citus.max_shared_pool_size & citus.local_shared_pool_size). The reaason
for this is that we didn't even try to establish these connections.
3. For the rest of the cases where a connection failed for some reason,
we always increment connection_establishment_failed even if the caller
was okay with the failure and know how to recover from it (e.g., the
adaptive executor knows how to fall back local execution when the target
node is the local node and if it cannot establish a connection to the
local node). The reason is that even if it's likely that we can still
serve the operation, we still failed to establish the connection and we
want to track this.
4. Finally, the connection failures that we count in
connection_establishment_failed might be caused by any of the following
reasons and for now we prefer to _not_ further distinguish them for
simplicity:
a. remote node is down or cannot accept any more connections, or
overloaded such that citus.node_connection_timeout is not enough to
establish a connection
b. any internal Citus error that might result in preparing a bad
connection string so that libpq fails when parsing the connection string
even before actually trying to establish a connection via connect() call
c. broken citus.node_conninfo or such Citus configuration that was
incorrectly set by the user can also result in similar outcomes as in b
d. internal waitevent set / poll errors or OOM in local node

We also track two more statistics for query execution:

* **query_execution_single_shard**
* **query_execution_multi_shard**

And more importantly, both query_execution_single_shard and
query_execution_multi_shard are not only tracked for the top-level
queries but also for the subplans etc. The reason is that for some
queries, e.g., the ones that go through recursive planning, after Citus
performs the heavy work as part of subplans, the work that needs to be
done for the top-level query becomes quite straightforward. And for such
query types, it would be deceiving if we only incremented the query stat
counters for the top-level query. Similarly, for non-pushable INSERT ..
SELECT and MERGE queries, we perform separate counter increments for the
SELECT / source part of the query besides the final INSERT / MERGE
query.
2025-04-28 12:23:52 +00:00
Naisila Puka 6bd3474804 Rename foreach_ macros to foreach_declared_ macros (#7700)
This is prep work for successful compilation with PG17

PG17added foreach_ptr, foreach_int and foreach_oid macros
Relevant PG commit
14dd0f27d7cd56ffae9ecdbe324965073d01a9ff

14dd0f27d7

We already have these macros, but they are different with the
PG17 ones because our macros take a DECLARED variable, whereas
the PG16 macros declare a locally-scoped loop variable themselves.

Hence I am renaming our macros to foreach_declared_

I am separating this into its own PR since it touches many files. The
main compilation PR is https://github.com/citusdata/citus/pull/7699
2025-03-12 11:01:49 +03:00
eaydingol ee11492a0e
Generate qualified relation name (#7427)
This change refactors the code by using generate_qualified_relation_name
from id instead of using a sequence of functions to generate the
relation name.


Fixes #6602
2024-01-22 17:32:49 +03:00
Nils Dijk 0620c8f9a6
Sort includes (#7326)
This change adds a script to programatically group all includes in a
specific order. The script was used as a one time invocation to group
and sort all includes throught our formatted code. The grouping is as
follows:

 - System includes (eg. `#include<...>`)
 - Postgres.h (eg. `#include "postgres.h"`)
- Toplevel imports from postgres, not contained in a directory (eg.
`#include "miscadmin.h"`)
 - General postgres includes (eg . `#include "nodes/..."`)
- Toplevel citus includes, not contained in a directory (eg. `#include
"citus_verion.h"`)
 - Columnar includes (eg. `#include "columnar/..."`)
 - Distributed includes (eg. `#include "distributed/..."`)

Because it is quite hard to understand the difference between toplevel
citus includes and toplevel postgres includes it hardcodes the list of
toplevel citus includes. In the same manner it assumes anything not
prefixed with `columnar/` or `distributed/` as a postgres include.

The sorting/grouping is enforced by CI. Since we do so with our own
script there are not changes required in our uncrustify configuration.
2023-11-23 18:19:54 +01:00
Teja Mupparti 58da8771aa This pull request introduces support for nonroutable merge commands in the following scenarios:
1) For distributed tables that are not colocated.
2) When joining on a non-distribution column for colocated tables.
3) When merging into a distributed table using reference or citus-local tables as the data source.

This is accomplished primarily through the implementation of the following two strategies.

Repartition: Plan the source query independently,
execute the results into intermediate files, and repartition the files to
co-locate them with the merge-target table. Subsequently, compile a final
merge query on the target table using the intermediate results as the data
source.

Pull-to-coordinator: Execute the plan that requires evaluation at the coordinator,
run the query on the coordinator, and redistribute the resulting rows to ensure
colocation with the target shards. Direct the MERGE SQL operation to the worker
nodes' target shards, using the intermediate files colocated with the data as the
data source.
2023-06-19 12:23:40 -07:00
Teja Mupparti f6a516dab5 Refactor repartitioning code into generic format 2023-06-05 09:06:05 -07:00
Teja Mupparti ff2062e8c3 Rename insert-select redistribute code base to generic purpose 2023-06-01 09:43:43 -07:00
rajeshkt78 85b8a2c7a1
CDC implementation for Citus using Logical Replication (#6623)
Description:
Implementing CDC changes using Logical Replication to avoid
re-publishing events multiple times by setting up replication origin
session, which will add "DoNotReplicateId" to every WAL entry.
   - shard splits
   - shard moves
   - create distributed table
   - undistribute table
   - alter distributed tables (for some cases)
   - reference table operations
   

The citus decoder which will be decoding WAL events for CDC clients, 
ignores any WAL entry with replication origin that is not zero.
It also maps the shard names to distributed table names.
2023-03-28 16:00:21 +05:30
Marco Slot cff013a057 Fix issues with insert..select casts and column ordering 2022-07-28 13:23:57 +02:00
Onder Kalaci 575bb6dde9 Drop support for Inactive Shard placements
Given that we do all operations via 2PC, there is no way
for any placement to be marked as INACTIVE.
2021-10-22 18:03:35 +02:00
Marco Slot 5de3337b2f Support local execution for INSERT..SELECT with re-partitioning 2021-01-06 16:15:53 +01:00
SaitTalhaNisanci 366461ccdb
Introduce cache entry/table utilities (#4132)
Introduce table entry utility functions

Citus table cache entry utilities are introduced so that we can easily
extend existing functionality with minimum changes, specifically changes
to these functions. For example IsNonDistributedTableCacheEntry can be
extended for citus local tables without the need to scan the whole
codebase and update each relevant part.

* Introduce utility functions to find the type of tables

A table type can be a reference table, a hash/range/append distributed
table. Utility methods are created so that we don't have to worry about
how a table is considered as a reference table etc. This also makes it
easy to extend the table types.

* Add IsCitusTableType utilities

* Rename IsCacheEntryCitusTableType -> IsCitusTableTypeCacheEntry

* Change citus table types in some checks
2020-09-02 22:26:05 +03:00
Sait Talha Nisanci 1112b254a7 adapt recently added code for pg13
This commit mostly adds pg_get_triggerdef_command to our ruleutils_13.
This doesn't add anything extra for ruleutils 13 so it is basically a copy
of the change on ruleutils_12
2020-08-04 15:18:27 +03:00
Sait Talha Nisanci 1a7ccac6ef Add RangeTableEntryFromNSItem macro
addRangeTableEntryXXX methods return a ParseNamespaceItem with pg >= 13.
RangeTableEntryFromNSItem macro is added so that we return the range
table entry from the ParseNamespaceItem in pg>=13 and for pg < 13 rte
would already be returned with addRangeTableEntryXXX methods.

Commit on Postgres side:
5815696bc66b3092f6361f53e0394909647042c8
2020-08-04 15:10:22 +03:00
Sait Talha Nisanci bf831d2e59 Use table_openXXX methods in the codebase
With PG13 heap_* (heap_open, heap_close etc) are replaced with table_*
(table_open, table_close etc).

It is better to use the new table access methods in the codebase and
define the macros for the previous versions as we can easily remove the
macro without having to change the codebase when we drop the support for
the old version.

Commits that introduced this change on Postgres:
f25968c49697db673f6cd2a07b3f7626779f1827
e0c4ec07284db817e1f8d9adfb3fffc952252db0
4b21acf522d751ba5b6679df391d5121b6c4a35f

Command to see relevant commits on Postgres side:
git log --all --grep="heap_open"
2020-08-04 15:10:22 +03:00
Hadi Moshayedi 13003d8d05 Use TupleDestination API for partitioning in insert/select. 2020-07-17 09:43:46 -07:00
Hadi Moshayedi 4ed59d2db3 Move more from insert_select_executor to insert_select_planner 2020-06-26 08:08:26 -07:00
Hadi Moshayedi d34c21890f Rename CoordinatorInsertSelect... to NonPushableInsertSelect 2020-06-25 08:55:48 -07:00
Hadi Moshayedi 4e8d79998e Save INSERT/SELECT method in DistributedPlan.
This is so we don't need to calculate it twice in
insert_select_executor.c and multi_explain.c, which can
cause discrepancy if an update in one of them is not
reflected in the other site.
2020-06-25 08:55:48 -07:00
Marco Slot 2a3234ca26 Rename masterQuery to combineQuery 2020-06-17 14:14:37 +02:00
Philip Dubé 39400319e6 Defer freeing CitusTableCacheEntry, as there were memory safety issues before
Shard id to index mapping stored in cache entry as there may now be multiple entries alive for a given relation

insert_select_executor: revert copying cache entry, which was a hack added to avoid memory safety issues
2020-06-15 16:20:50 +00:00
Philip Dubé c0515dcd67 This prepares for routing modifying CTEs, where modLevel should not be used to infer whether a plan is a select or not
SELECT_TASK is renamed to READ_TASK as a SELECT with modifying CTEs will be a MODIFYING_TASK

RouterInsertJob: Assert originalQuery->commandType == CMD_INSERT
CreateModifyPlan: Assert originalQuery->commandType != CMD_SELECT

Remove unused function IsModifyDistributedPlan

DistributedExecution, ExecutionParams, DistributedPlan: Rename hasReturning to expectResults
SELECTs set expectResults to true

Rename CreateSingleTaskRouterPlan to CreateSingleTaskRouterSelectPlan
2020-05-20 17:26:12 +00:00
Philip Dubé 8e79672839 Try copying shard intervals out of cache for long lived borrow 2020-04-17 22:00:41 +00:00
Marco Slot fd8cdb92f4 Evaluate nextval in the target list on the coordinator 2020-04-02 02:53:19 +02:00
SaitTalhaNisanci 0aebd78ea7 use localExecution in ExecuteTaskListExtended
ExecuteTaskListExtended is the common method for different codepaths,
and instead of writing separate local execution logics in different
codepaths, it makes more sense to have the logic here. We still need to
do some refactoring, this is an initial step.

After this commit, we can run create shard commands locally. There is a
special case with shard creation commands. A create shard command might
have a concatenated query string, however local execution did not know
how to execute a task with multiple query strings. This is also
implemented in this commit. We go over each query in the concatenated
query string and plan/execute them one by one.

A more clean solution to this would be to make sure that each task has a
single query. We currently cannot do that because we need to ensure the
task dependencies. However, it would make sense to do that at some point
and it would simplify the code a lot.
2020-04-01 18:23:16 +03:00
SaitTalhaNisanci c796ac335d add TaskQuery struct to abstract query string related fields
We had many fields in task related to query strings. It was kind of
complex, and only of them could be set at a time. Therefore it makes
more sense to abstract this and use a union so that it is clear that
only of them should be set.

We have three fields that could have query related strings:
- queryForLocation
- queryStringLazy
- perPlacementQueryStrings

Relatively, they can be set with:
- SetTaskQueryString
- SetTaskQueryIfShouldLazyDeparse
- SetTaskPerPlacementQueryStrings

The direct usage of the query related fields are also removed.

Rename queryForLocalExecution

Currently queryForLocalExecution is only used for deparsing purposes,
therefore it makes sense to rename it to what it is doing.
2020-03-31 15:47:55 +03:00
SaitTalhaNisanci 98f95e2a5e add TaskQueryStringForPlacement
TaskQueryStringForPlacement simplifies how the executor gets the query
string for a given placement. Task will use the necessary fields to
return the correct query placement string. Executor doesn't need to know
the details for this.

rename TaskQueryString as TaskQueryStringAllPlacements

TaskQueryString returns the query string that will be the same for all
the placements. In INSERT..SELECT the query string can be different for
each placement. Adaptive executor uses TaskQueryStringForPlacement,
which returns the query string for a placement. It makes sense to rename
TaskQueryString as TaskQueryStringAllPlacements as it is returning the
query string for all placements.

rename SetTaskQuery as SetTaskQueryIfShouldLazyDeparse

SetTaskQuery does not always sets the task query. It can set the query
string as well. So it is more clear to name it
SetTaskQueryIfShouldLazyDeparse, since it will set the query not query
string only when we should deparse the query in a lazy way.
2020-03-31 15:47:55 +03:00
SaitTalhaNisanci 9d2f3c392a enable local execution in INSERT..SELECT and add more tests
We can use local copy in INSERT..SELECT, so the check that disables
local execution is removed.

Also a test for local copy where the data size >
LOCAL_COPY_FLUSH_THRESHOLD is added.

use local execution with insert..select
2020-03-18 09:34:39 +03:00
SaitTalhaNisanci 1df9601e13 not use local copy if current transaction is connected to local group
If current transaction is connected to local group we should not use
local copy, because we might not see some of the changes that are made
over the connection to the local group.
2020-03-18 09:28:59 +03:00
SaitTalhaNisanci f9c4431885 add the support to execute copy locally
A copy will be executed locally if
- Local execution is enabled and current transaction accessed a local placement
- Local execution is enabled and we are inside a transaction block.

So even if local execution is enabled but we are not in a transaction block, the copy will not be run locally.

This will not run locally:
```
COPY distributed_table FROM STDIN;
....
```

This will run locally:
```
SET citus.enable_local_execution to 'on';
BEGIN;
COPY distributed_table FROM STDIN;
COMMIT;
....
```
.
There are 3 ways to do a copy in postgres programmatically:
- from a file
- from a program
- from a callback function

I have chosen to implement it with a callback function, which means that we write the rows of copy from a callback function to the output buffer, which is used to insert tuples into the actual table.

For each shard id, we have a buffer that keeps the current rows to be written, we perform the actual copy operation either when:
- copy buffer for the given shard id reaches to a threshold, which is currently 512KB
- we reach to the end of the copy

The buffer size is debatable(512KB). At a given time, we might allocate (local placement * buffer size) memory at most.

The local copy uses the same copy format as remote copy, which means that we serialize the data in the same format as remote copy and send it locally.

There was also the option to use ExecSimpleRelationInsert to insert
slots one by one, which would avoid the extra
serialization/deserialization but doing some benchmarks it seems that
using buffers are significantly better in terms of the performance.

You can see this comment for more details: https://github.com/citusdata/citus/pull/3557#discussion_r389499054
2020-03-18 09:28:59 +03:00
Philip Dubé 7cdfa1daab Rename LookupCitusTableCacheEntry to GetCitusTableCacheEntry, LookupLookupCitusTableCacheEntry back to LookupCitusTableCacheEntry 2020-03-08 14:08:23 +00:00
Philip Dubé a7cca1bcde Rename DistTableCacheEntry to CitusTableCacheEntry 2020-03-07 14:08:03 +00:00
Philip Dubé bec58000d6 Given IsDistributedTableRTE, there's ambiguity in what DistributedTable means
Elsewhere we used DistributedTable to include reference tables
Marco suggested we use CitusTable for distributed & reference tables

So renaming:
- IsDistributedTable -> IsCitusTable
- IsDistributedTableViaCatalog -> IsCitusTableViaCatalog
- DistributedTableCacheEntry -> CitusTableCacheEntry
- DistributedTableList -> CitusTableList
- isDistributedTable -> isCitusTable
- InsertSelectIntoDistributedTable -> InsertSelectIntoCitusTable
- ExtractFirstDistributedTableId -> ExtractFirstCitusTableId
2020-03-06 18:57:55 +00:00
Philip Dubé 20abc4d2b5
Replace foreach with foreach_ptr/foreach_oid (#3544) 2020-02-27 16:54:49 +01:00
Hadi Moshayedi 9dd14fa90d Rename discarded target list items in repartitioned INSERT/SELECT 2020-02-05 11:06:44 +01:00
Jelte Fennema 246435be7e
Lazy query deparsing executable queries (#3350)
Deparsing and parsing a query can be heavy on CPU. When locally executing 
the query we don't need to do this in theory most of the time.

This PR is the first step in allowing to skip deparsing and parsing
the query in these cases, by lazily creating the query string and
storing the query in the task. Future commits will make use of this and
not deparse and parse the query anymore, but use the one from the task
directly.
2020-01-17 11:49:43 +01:00
Hadi Moshayedi 6cf1c01660 Don't use repartitioned INSERT/SELECT for repartition joins 2020-01-16 23:40:31 -08:00
Hadi Moshayedi 5eeb07124f Repartitioned INSERT/SELECT: include job id in result id prefix 2020-01-16 23:24:52 -08:00
Hadi Moshayedi a079278b0c Repartitioned INSERT/SELECT: Add a GUC to enable/disable it 2020-01-16 23:24:52 -08:00
Hadi Moshayedi ce5eea4885 INSERT/SELECT: make SELECT column names unique 2020-01-16 23:24:52 -08:00
Hadi Moshayedi 97072c9eb1 INSERT/SELECT: show method in EXPLAIN output 2020-01-16 23:24:52 -08:00
Hadi Moshayedi fe548b762f Repartitioned INSERT/SELECT: Test CTEs 2020-01-16 23:24:52 -08:00
Hadi Moshayedi 494cc383cc Repartitioned INSERT/SELECT: Enable RETURNING 2020-01-16 23:24:52 -08:00
Hadi Moshayedi 44a2aede16 Don't start a coordinated transaction on workers.
Otherwise transaction hooks of Citus kick in and might cause unwanted errors.
2020-01-16 23:24:52 -08:00
Hadi Moshayedi 42c3c03b85 Handle extra columns added in ExpandWorkerTargetEntry() in repartitioned INSERT/SELECT 2020-01-16 23:24:52 -08:00
Hadi Moshayedi 89463f9760 Repartitioned INSERT/SELECT: cast columns in SELECT targets 2020-01-16 23:24:52 -08:00
Hadi Moshayedi d67a384350 Enable repartitioned INSERT/SELECT ON CONFLICT. 2020-01-16 23:24:52 -08:00
Hadi Moshayedi b4e5f4b10a Implement INSERT ... SELECT with repartitioning 2020-01-16 23:24:52 -08:00
Hadi Moshayedi ced876358d INSERT/SELECT: Refactor out AddInsertSelectCasts 2020-01-16 23:24:52 -08:00
Hadi Moshayedi d449c1857c INSERT/SELECT: Use ExecutePlan* instead of ExecuteSelect* 2020-01-16 23:24:52 -08:00