UPDATEs on partitioned tables that affect only row partitions should
succeed, the rest should fail.
Also rename CStoreScan to ColumnarScan to make the error message more
relevant.
When Citus needs to parallelize queries on the local node (e.g., the node
executing the distributed query and the shards are the same), we need to
be mindful about the connection management. The reason is that the client
backends that are running distributed queries are competing with the client
backends that Citus initiates to parallelize the queries in order to get
a slot on the max_connections.
In that regard, we implemented a "failover" mechanism where if the distributed
queries cannot get a connection, the execution failovers the tasks to the local
execution.
The failover logic is follows:
- As the connection manager if it is OK to get a connection
- If yes, we are good.
- If no, we fail the workerPool and the failure triggers
the failover of the tasks to local execution queue
The decision of getting a connection is follows:
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kics in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
When distributing a columnar table, as well as changing options on a distributed columnar table, this patch will forward the settings from the coordinator to the workers.
For propagating options changes on an already distributed table this change is pretty straight forward. Before applying the change in options locally we will create a `DDLJob` that contains a call to `alter_columnar_table_set(...)` for every shard placement with all settings of the current table. This goes both for setting an option as well as resetting. This will reset the values to the defaults configured on the coordinator. Having the effect that the coordinator is authoritative on the settings and makes sure the shards have the same settings set as the table on the coordinator.
When a columnar table is distributed it is using the `TableDDLCommand` infra structure to create a new kind of `TableDDLCommand`. This new type, called a `TableDDLCommandFunction` contains a context and 2 function pointers to execute. One function returns the command as applied on the table, the second function will return the sql command to apply to a shard with a given shard id. The schema name is ignored as it will use the fully qualified name of the shard in the same schema as the base table.
Multi-row execution already uses sequential execution. When shards
are local, using local execution is profitable as it avoids
an extra connection establishment to the local node.
Columnar options were by accident linked to the relfilenode instead of the regclass/relation oid. This PR moves everything related to columnar options to their own catalog table.
Considering the adaptive connection management
improvements that we plan to roll soon, it makes it
very helpful to know the number of active client
backends.
We are doing this addition to simplify yhe adaptive connection
management for single node Citus. In single node Citus, both the
client backends and Citus parallel queries would compete to get
slots on Postgres' `max_connections` on the same Citus database.
With adaptive connection management, we have the counters for
Citus parallel queries. That helps us to adaptively decide
on the remote executions pool size (e.g., throttle connections
if necessary).
However, we do not have any counters for the total number of
client backends on the database. For single node Citus, we
should consider all the client backends, not only the remote
connections that Citus does.
Of course Postgres internally knows how many client
backends are active. However, to get that number Postgres
iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](8e90ec5580/src/backend/utils/adt/pgstatfuncs.c (L1240))
where Postgres iterates over all the backends.
For our purpuses, we need this information on every connection
establishment. That's why we cannot affort to do this kind of
iterattion.
* Move local execution after the remote execution
Before this commit, when both local and remote tasks
exist, the executor was starting the execution with
local execution. There is no strict requirements on
this.
Especially considering the adaptive connection management
improvements that we plan to roll soon, moving the local
execution after to the remote execution makes more sense.
The adaptive connection management for single node Citus
would look roughly as follows:
- Try to connect back to the coordinator for running
parallel queries.
- If succeeds, go on and execute tasks in parallel
- If fails, fallback to the local execution
So, we'll use local execution as a fallback mechanism. And,
moving it after to the remote execution allows us to implement
such further scenarios.
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.
TableAM API doesn't allow us to pass around a state variable along all of the tuple inserts belonging to the same command. We require this in columnar store, since we batch them, and when we have enough rows we flush them as stripes.
To do that, we keep a (relfilenode) -> stack of (subxact id, TableWriteState) global mapping.
**Inserts**
Whenever we want to insert a tuple, we look up for the relation's relfilenode in this mapping. If top of the stack matches current subtransaction, we us the existing TableWriteState. Otherwise, we allocate a new TableWriteState and push it on top of stack.
**(Sub)Transaction Commit/Aborts**
When the subtransaction or transaction is committed, we flush and pop all entries matching current SubTransactionId.
When the subtransaction or transaction is committed, we pop all entries matching current SubTransactionId and discard them without flushing.
**Reads**
Since we might have unwritten rows which needs to be read by a table scan, we flush write states on SELECTs. Since flushing the write state of upper transactions in a subtransaction will cause metadata being written in wrong subtransaction, we ERROR out if any of the upper subtransactions have unflushed rows.
**Table Drops**
We record in which subtransaction the table was dropped. When committing a subtransaction in which table was dropped, we propagate the drop to upper transaction. When aborting a subtransaction in which table was dropped, we mark table as not deleted.
When a relation is used on an OUTER JOIN with FALSE filters,
set_rel_pathlist_hook may not be called for the table.
There might be other cases as well, so do not rely on the hook
for classification of the tables.