Considering the adaptive connection management
improvements that we plan to roll soon, it makes it
very helpful to know the number of active client
backends.
We are doing this addition to simplify yhe adaptive connection
management for single node Citus. In single node Citus, both the
client backends and Citus parallel queries would compete to get
slots on Postgres' `max_connections` on the same Citus database.
With adaptive connection management, we have the counters for
Citus parallel queries. That helps us to adaptively decide
on the remote executions pool size (e.g., throttle connections
if necessary).
However, we do not have any counters for the total number of
client backends on the database. For single node Citus, we
should consider all the client backends, not only the remote
connections that Citus does.
Of course Postgres internally knows how many client
backends are active. However, to get that number Postgres
iterates over all the backends. For examaple, see [pg_stat_get_db_numbackends](8e90ec5580/src/backend/utils/adt/pgstatfuncs.c (L1240))
where Postgres iterates over all the backends.
For our purpuses, we need this information on every connection
establishment. That's why we cannot affort to do this kind of
iterattion.
CitusTableTypeIdList() function iterates on all the entries of pg_dist_partition
and loads all the metadata in to the cache. This can be quite memory intensive
especially when there are lots of distributed tables.
When partitioned tables are used, it is common to have many distributed tables
given that each partition also becomes a distributed table.
CitusTableTypeIdList() is used on every CREATE TABLE .. PARTITION OF.. command
as well. It means that, anytime a partition is created, Citus loads all the
metadata to the cache. Note that Citus typically only loads the accessed table's
metadata to the cache.
* Move local execution after the remote execution
Before this commit, when both local and remote tasks
exist, the executor was starting the execution with
local execution. There is no strict requirements on
this.
Especially considering the adaptive connection management
improvements that we plan to roll soon, moving the local
execution after to the remote execution makes more sense.
The adaptive connection management for single node Citus
would look roughly as follows:
- Try to connect back to the coordinator for running
parallel queries.
- If succeeds, go on and execute tasks in parallel
- If fails, fallback to the local execution
So, we'll use local execution as a fallback mechanism. And,
moving it after to the remote execution allows us to implement
such further scenarios.
The adaptive executor emulates the TCP's slow start algorithm.
Whenever the executor needs new connections, it doubles the number
of connections established in the previous iteration.
This approach is powerful. When the remote queries are very short
(like index lookup with < 1ms), even a single connection is sufficent
most of the time. When the remote queries are long, the executor
can quickly establish necessary number of connections.
One missing piece on our implementation seems that the executor
keeps doubling the number of connections even if the previous
connection attempts have been finalized. Instead, we should
wait until all the attempts are finalized. This is how TCP's
slow-start works. Plus, it decreases the unnecessary pressure
on the remote nodes.
I got this warning when compiling citus:
```
../columnar/write_state_management.c: In function ‘PendingWritesInUpperTransactions’:
../columnar/write_state_management.c:364:20: warning: ‘entry’ may be used uninitialized in this function [-Wmaybe-uninitialized]
if (found && entry->writeStateStack != NULL)
~~~~~^~~~~~~~~~~~~~~~
```
I fixed this by checking by always initializing entry, by using an early
return if `WriteStateMap` didn't exist. Instead of using the `found`
variable to check for existence of the key, I now simply check the
`entry` variable itself.
To quote the postgres comment on the hash_enter function:
> If foundPtr isn't NULL, then *foundPtr is set true if we found an
> existing entry in the table, false otherwise. This is needed in the
> HASH_ENTER case, but is redundant with the return value otherwise.
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.
It seems that we forgot to pass the revelant
flag to enable Postgres' parallel query
capabilities on the shards when user does
EXPLAIN ANALYZE on a distributed table.
TableAM API doesn't allow us to pass around a state variable along all of the tuple inserts belonging to the same command. We require this in columnar store, since we batch them, and when we have enough rows we flush them as stripes.
To do that, we keep a (relfilenode) -> stack of (subxact id, TableWriteState) global mapping.
**Inserts**
Whenever we want to insert a tuple, we look up for the relation's relfilenode in this mapping. If top of the stack matches current subtransaction, we us the existing TableWriteState. Otherwise, we allocate a new TableWriteState and push it on top of stack.
**(Sub)Transaction Commit/Aborts**
When the subtransaction or transaction is committed, we flush and pop all entries matching current SubTransactionId.
When the subtransaction or transaction is committed, we pop all entries matching current SubTransactionId and discard them without flushing.
**Reads**
Since we might have unwritten rows which needs to be read by a table scan, we flush write states on SELECTs. Since flushing the write state of upper transactions in a subtransaction will cause metadata being written in wrong subtransaction, we ERROR out if any of the upper subtransactions have unflushed rows.
**Table Drops**
We record in which subtransaction the table was dropped. When committing a subtransaction in which table was dropped, we propagate the drop to upper transaction. When aborting a subtransaction in which table was dropped, we mark table as not deleted.