This GUC has two settings, 'always' and 'never'. When it's set to
'never' all behavior stays exactly as it was prior to this commit. When
it's set to 'always' only SELECT queries are allowed to run, and only
secondary nodes are used when processing those queries.
Add some helper functions:
- WorkerNodeIsSecondary(), checks the noderole of the worker node
- WorkerNodeIsReadable(), returns whether we're currently allowed to
read from this node
- ActiveReadableNodeList(), some functions (namely, the ones on the
SELECT path) don't require working with Primary Nodes. They should call
this function instead of ActivePrimaryNodeList(), because the latter
will error out in contexts where we're not allowed to write to nodes.
- ActiveReadableNodeCount(), like the above, replaces
ActivePrimaryNodeCount().
- EnsureModificationsCanRun(), error out if we're not currently allowed
to run queries which modify data. (Either we're in read-only mode or
use_secondary_nodes is set)
Some parts of the code were switched over to use readable nodes instead
of primary nodes:
- Deadlock detection
- DistributedTableSize,
- the router, real-time, and task tracker executors
- ShardPlacement resolution
This change declares two new functions:
`master_update_table_statistics` updates the statistics of shards belong
to the given table as well as its colocated tables.
`get_colocated_shard_array` returns the ids of colocated shards of a
given shard.
This is a pretty substantial refactoring of the existing modify path
within the router executor and planner. In particular, we now hunt for
all VALUES range table entries in INSERT statements and group the rows
contained therein by shard identifier. These rows are stashed away for
later in "ModifyRoute" elements. During deparse, the appropriate RTE
is extracted from the Query and its values list is replaced by these
rows before any SQL is generated.
In this way, we can create multiple Tasks, but only one per shard, to
piecemeal execute a multi-row INSERT. The execution of jobs containing
such tasks now exclusively go through the "multi-router executor" which
was previously used for e.g. INSERT INTO ... SELECT.
By piggybacking onto that executor, we participate in ongoing trans-
actions, get rollback-ability, etc. In short order, the only remaining
use of the "single modify" router executor will be for bare single-
row INSERT statements (i.e. those not in a transaction).
This change appropriately handles deferred pruning as well as master-
evaluated functions.
With this PR, Citus starts to support all possible ways to create
distributed partitioned tables. These are;
- Distributing already created partitioning hierarchy
- CREATE TABLE ... PARTITION OF a distributed_table
- ALTER TABLE distributed_table ATTACH PARTITION non_distributed_table
- ALTER TABLE distributed_table ATTACH PARTITION distributed_table
We also support DETACHing partitions from partitioned tables and propogating
TRUNCATE and DDL commands to distributed partitioned tables.
This PR also refactors some parts of distributed table creation logic.
- master_activate_node and master_disable_node correctly toggle
isActive, without crashing
- master_add_node rejects duplicate nodes, even if they're in different
clusters
- master_remove_node allows removing nodes in different clusters
This commit is preperation for introducing distributed partitioned
table support. We want to clean and refactor some code in distributed
table creation logic so that we can handle partitioned tables in more
robust way.
- Never release locks
- AddNodeMetadata takes ShareRowExclusiveLock so it'll conflict with the
trigger which prevents multiple primary nodes.
- ActivateNode and SetNodeState used to take AccessShareLock, but they
modify the table so they should take RowExclusiveLock.
- DeleteNodeRow and InsertNodeRow used to take AccessExclusiveLock but
only need RowExclusiveLock.
- master_add_node enforces that there is only one primary per group
- there's also a trigger on pg_dist_node to prevent multiple primaries
per group
- functions in metadata cache only return primary nodes
- Rename ActiveWorkerNodeList -> ActivePrimaryNodeList
- Rename WorkerGetLive{Node->Group}Count()
- Refactor WorkerGetRandomCandidateNode
- master_remove_node only complains about active shard placements if the
node being removed is a primary.
- master_remove_node only deletes all reference table placements in the
group if the node being removed is the primary.
- Rename {Node->NodeGroup}HasShardPlacements, this reflects the behavior it
already had.
- Rename DeleteAllReferenceTablePlacementsFrom{Node->NodeGroup}. This also
reflects the behavior it already had, but the new signature forces the
caller to pass in a groupId
- Rename {WorkerGetLiveGroup->ActivePrimaryNode}Count
GCC 7 added `-Wimplicit-fallthrough` to warn for not explicitly specified switch/case fall-throughs.
According to https://gcc.gnu.org/gcc-7/changes.html, to suppress that warning we could either use `__attribute__(fallthrough)`, which didn't seem to work for earlier GCC versions, or a `/* fallthrough */` comment just before the following `case`.
Previously Citus code had the fall-through comments inside the brackets, which didn't seem to suppress the warning. Putting a `/* fallthrough */` comment outside the brackets and right before the `case` fixes the problem.
Comes with a few changes:
- Change the signature of some functions to accept groupid
- InsertShardPlacementRow
- DeleteShardPlacementRow
- UpdateShardPlacementState
- NodeHasActiveShardPlacements returns true if the group the node is a
part of has any active shard placements
- TupleToShardPlacement now returns ShardPlacements which have NULL
nodeName and nodePort.
- Populate (nodeName, nodePort) when creating ShardPlacements
- Disallow removing a node if it contains any shard placements
- DeleteAllReferenceTablePlacementsFromNode matches based on group. This
doesn't change behavior for now (while there is only one node per
group), but means in the future callers should be careful about
calling it on a secondary node, it'll delete placements on the primary.
- Create concept of a GroupShardPlacement, which represents an actual
tuple in pg_dist_placement and is distinct from a ShardPlacement,
which has been resolved to a specific node. In the future
ShardPlacement should be renamed to NodeShardPlacement.
- Create some triggers which allow existing code to continue to insert
into and update pg_dist_shard_placement as if it still existed.
This commit is intended to be a base for supporting declarative partitioning
on distributed tables. Here we add the following utility functions and their
unit tests:
* Very basic functions including differnentiating partitioned tables and
partitions, listing the partitions
* Generating the PARTITION BY (expr) and adding this to the DDL events
of partitioned tables
* Ability to generate text representations of the ranges for partitions
* Ability to generate the `ALTER TABLE parent_table ATTACH PARTITION
partition_table FOR VALUES value_range`
* Ability to apply add shard ids to the above command using
`worker_apply_inter_shard_ddl_command()`
* Ability to generate `ALTER TABLE parent_table DETACH PARTITION`
Adds support for PostgreSQL 10 by copying in the requisite ruleutils
and updating all API usages to conform with changes in PostgreSQL 10.
Most changes are fairly minor but they are numerous. One particular
obstacle was the change in \d behavior in PostgreSQL 10's psql; I had
to add SQL implementations (views, mostly) to mimic the pre-10 output.
Add a second implementation of INSERT INTO distributed_table SELECT ... that is used if
the query cannot be pushed down. The basic idea is to execute the SELECT query separately
and pass the results into the distributed table using a CopyDestReceiver, which is also
used for COPY and create_distributed_table. When planning the SELECT, we go through
planner hooks again, which means the SELECT can also be a distributed query.
EXPLAIN is supported, but EXPLAIN ANALYZE is not because preventing double execution was
a lot more complicated in this case.
With this commit we start to register InvalidateDistRelationCacheCallback
function as cache invalidation callback function before version checks
because during version checks we use cache to look up relation ids of some
relations like pg_dist_relation or pg_dist_partition_logical_relid_index
and we want to know about cache invalidation before accessing them.
During version update, we indirectly calld CheckInstalledVersion via
ChackCitusVersions. This obviously fails because during version update it is
expected to have version mismatch between installed version and binary version.
Thus, we remove that ChackCitusVersions. We now only call ChackAvailableVersion.
Before this commit, we were erroring out at almost all queries if there is a
version mismatch. With this commit, we started to error out only requested
operation touches distributed tables.
Normally we would need to use distributed cache to understand whether a table
is distributed or not. However, it is not safe to read our metadata tables when
there is a version mismatch, thus it is not safe to create distributed cache.
Therefore for this specific occasion, we directly read from pg_dist_partition
table. However; reading from catalog is costly and we should not use this
method in other places as much as possible.
This commit fixes the problem where we incorrectly try to reach distributed table
cache when the extension is not loaded completely. We tried to reach the cache
because we wanted to get reference table information to activate the node. However
it is actually not necessary to explicitly activate the nodes which come from
master_initialize_node_metadata. Because it only runs during extension creation and
at that time there are no reference tables and all nodes are considered as active.
* Accept invalidation messages before accessing the metadata cache
This commit is crucial to prevent stale metadata reads from the
cache. Without this commit, some of the operations may use stale
metadata which could end up with various bugs such as crashes,
inconsistent/lost data etc.
As an example, consider that a COPY operation is blocked on shard
metadata lock. Another concurrent session updates the metadata and
invalidates the cache. However, since Citus doesn't accept invalidations,
COPY continues with the stale metadata once it acquires the lock.
With this commit, we make sure that invalidation messages are accepted
just before accessing the metadata cache and preventing any operation to
use stale metadata.
* Add isolation tests for placement changes and conccurrent operations
- add node with reference table vs COPY/insert/update/DDL
- repair shard vs COPY/insert/update/DDL
- repair shard vs repair shard
- There was a crash when the table a shardid belonged to changed during
a session. Instead of crashing (a failed assert) we now throw an error
- Update the isolation test which was crashing to no longer exercise
that code path
- Add a regression test to check that the error is thrown
* Enabling physical planner for subquery pushdown changes
This commit applies the logic that exists in INSERT .. SELECT
planning to the subquery pushdown changes.
The main algorithm is followed as :
- pick an anchor relation (i.e., target relation)
- per each target shard interval
- add the target shard interval's shard range
as a restriction to the relations (if all relations
joined on the partition keys)
- Check whether the query is router plannable per
target shard interval.
- If router plannable, create a task
* Add union support within the JOINS
This commit adds support for UNION/UNION ALL subqueries that are
in the following form:
.... (Q1 UNION Q2 UNION ...) as union_query JOIN (QN) ...
In other words, we currently do NOT support the queries that are
in the following form where union query is not JOINed with
other relations/subqueries :
.... (Q1 UNION Q2 UNION ...) as union_query ....
* Subquery pushdown planner uses original query
With this commit, we change the input to the logical planner for
subquery pushdown. Before this commit, the planner was relying
on the query tree that is transformed by the postgresql planner.
After this commit, the planner uses the original query. The main
motivation behind this change is the simplify deparsing of
subqueries.
* Enable top level subquery join queries
This work enables
- Top level subquery joins
- Joins between subqueries and relations
- Joins involving more than 2 range table entries
A new regression test file is added to reflect enabled test cases
* Add top level union support
This commit adds support for UNION/UNION ALL subqueries that are
in the following form:
.... (Q1 UNION Q2 UNION ...) as union_query ....
In other words, Citus supports allow top level
unions being wrapped into aggregations queries
and/or simple projection queries that only selects
some fields from the lower level queries.
* Disallow subqueries without a relation in the range table list for subquery pushdown
This commit disallows subqueries without relation in the range table
list. This commit is only applied for subquery pushdown. In other words,
we do not add this limitation for single table re-partition subqueries.
The reasoning behind this limitation is that if we allow pushing down
such queries, the result would include (shardCount * expectedResults)
where in a non distributed world the result would be (expectedResult)
only.
* Disallow subqueries without a relation in the range table list for INSERT .. SELECT
This commit disallows subqueries without relation in the range table
list. This commit is only applied for INSERT.. SELECT queries.
The reasoning behind this limitation is that if we allow pushing down
such queries, the result would include (shardCount * expectedResults)
where in a non distributed world the result would be (expectedResult)
only.
* Change behaviour of subquery pushdown flag (#1315)
This commit changes the behaviour of the citus.subquery_pushdown flag.
Before this commit, the flag is used to enable subquery pushdown logic. But,
with this commit, that behaviour is enabled by default. In other words, the
flag is now useless. We prefer to keep the flag since we don't want to break
the backward compatibility. Also, we may consider using that flag for other
purposes in the next commits.
* Require subquery_pushdown when limit is used in subquery
Using limit in subqueries may cause returning incorrect
results. Therefore we allow limits in subqueries only
if user explicitly set subquery_pushdown flag.
* Evaluate expressions on the LIMIT clause (#1333)
Subquery pushdown uses orignal query, the LIMIT and OFFSET clauses
are not evaluated. However, logical optimizer expects these expressions
are already evaluated by the standard planner. This commit manually
evaluates the functions on the logical planner for subquery pushdown.
* Better format subquery regression tests (#1340)
* Style fix for subquery pushdown regression tests
With this commit we intented a more consistent style for the
regression tests we've added in the
- multi_subquery_union.sql
- multi_subquery_complex_queries.sql
- multi_subquery_behavioral_analytics.sql
* Enable the tests that are temporarily commented
This commit enables some of the regression tests that were commented
out until all the development is done.
* Fix merge conflicts (#1347)
- Update regression tests to meet the changes in the regression
test output.
- Replace Ifs with Asserts given that the check is already done
- Update shard pruning outputs
* Add view regression tests for increased subquery coverage (#1348)
- joins between views and tables
- joins between views
- union/union all queries involving views
- views with limit
- explain queries with view
* Improve btree operators for the subquery tests
This commit adds the missing comprasion for subquery composite key
btree comparator.
So far citus used postgres' predicate proofing logic for shard
pruning, except for INSERT and COPY which were already optimized for
speed. That turns out to be too slow:
* Shard pruning for SELECTs is currently O(#shards), because
PruneShardList calls predicate_refuted_by() for every
shard. Obviously using an O(N) type algorithm for general pruning
isn't good.
* predicate_refuted_by() is quite expensive on its own right. That's
primarily because it's optimized for doing a single refutation
proof, rather than performing the same proof over and over.
* predicate_refuted_by() does not keep persistent state (see 2.) for
function calls, which means that a lot of syscache lookups will be
performed. That's particularly bad if the partitioning key is a
composite key, because without a persistent FunctionCallInfo
record_cmp() has to repeatedly look-up the type definition of the
composite key. That's quite expensive.
Thus replace this with custom-code that works in two phases:
1) Search restrictions for constraints that can be pruned upon
2) Use those restrictions to search for matching shards in the most
efficient manner available:
a) Binary search / Hash Lookup in case of hash partitioned tables
b) Binary search for equal clauses in case of range or append
tables without overlapping shards.
c) Binary search for inequality clauses, searching for both lower
and upper boundaries, again in case of range or append
tables without overlapping shards.
d) exhaustive search testing each ShardInterval
My measurements suggest that we are considerably, often orders of
magnitude, faster than the previous solution, even if we have to fall
back to exhaustive pruning.
This determines whether it's possible to perform binary search on
sortedShardIntervalArray or not. If e.g. two shards have overlapping
ranges, that'd be prohibitive.
That'll be useful in later commit introducing faster shard pruning.
That's useful when comparing values a hash-partitioned table is
filtered by. The existing shardIntervalCompareFunction is about
comparing hashed values, not unhashed ones.
The added btree opclass function is so we can get a comparator
back. This should be changed much more widely, but is not necessary so
far.
Previously we, unnecessarily, used a the first shard's type
information to to look up the comparison function. But that
information is already available, so use it. That's helpful because
we sometimes want to access the comparator function even if there's no
shards.
All callers fetch a cache entry and extract/compute arguments for the
eventual FindShardInterval call, so it makes more sense to refactor
into that function itself; this solves the use-after-free bug, too.
With this change we add an option to add a node without replicating all reference
tables to that node. If a node is added with this option, we mark the node as
inactive and no queries will sent to that node.
We also added two new UDFs;
- master_activate_node(host, port):
- marks node as active and replicates all reference tables to that node
- master_add_inactive_node(host, port):
- only adds node to pg_dist_node
With this change, we start to error out if loaded citus binaries does not match
the available major version or installed citus extension version. In this case
we force user to restart the server or run ALTER EXTENSION depending on the
situation
Custom Scan is a node in the planned statement which helps external providers
to abstract data scan not just for foreign data wrappers but also for regular
relations so you can benefit your version of caching or hardware optimizations.
This sounds like only an abstraction on the data scan layer, but we can use it
as an abstraction for our distributed queries. The only thing we need to do is
to find distributable parts of the query, plan for them and replace them with
a Citus Custom Scan. Then, whenever PostgreSQL hits this custom scan node in
its Vulcano style execution, it will call our callback functions which run
distributed plan and provides tuples to the upper node as it scans a regular
relation. This means fewer code changes, fewer bugs and more supported features
for us!
First, in the distributed query planner phase, we create a Custom Scan which
wraps the distributed plan. For real-time and task-tracker executors, we add
this custom plan under the master query plan. For router executor, we directly
pass the custom plan because there is not any master query. Then, we simply let
the PostgreSQL executor run this plan. When it hits the custom scan node, we
call the related executor parts for distributed plan, fill the tuple store in
the custom scan and return results to PostgreSQL executor in Vulcano style,
a tuple per XXX_ExecScan() call.
* Modify planner to utilize Custom Scan node.
* Create different scan methods for different executors.
* Use native PostgreSQL Explain for master part of queries.
This UDF returns a shard placement from cache given shard id and placement id. At the
moment it iterates over all shard placements of given shard by ShardPlacementList and
searches given placement id in that list, which is not a good solution performance-wise.
However, currently, this function will be used only when there is a failed transaction.
If a need arises we can optimize this function in the future.
So far router planner had encapsulated different functionality in
MultiRouterPlanCreate. Modifications always go through router, selects
sometimes. Modifications always error out if the query is unsupported,
selects return NULL. Especially the error handling is a problem for
the upcoming extension of prepared statement support.
Split MultiRouterPlanCreate into CreateRouterPlan and
CreateModifyPlan, and change them to not throw errors.
Instead errors are now reported by setting the new
MultiPlan->plannigError.
Callers of router planner functionality now have to throw errors
themselves if desired, but also can skip doing so.
This is a pre-requisite for expanding prepared statement support.
While touching all those lines, improve a number of error messages by
getting them closer to the postgres error message guidelines.
It can be useful, e.g. in the upcoming prepared statement support, to
be able to return an error from a function that is not raised
immediately, but can later be thrown. That allows e.g. to attempt to
plan a statment using different methods and to create good error
messages in each planner, but to only error out after all planners
have been run.
To enable that create support for deferred error messages that can be
created (supporting errorcode, message, detail, hint) in one function,
and then thrown in different place.
This change adds support for serial columns to be used with MX tables.
Prior to this change, sequences of serial columns were created in all
workers (for being able to create shards) but never used. With MX, we
need to set the sequences so that sequences in each worker create
unique values. This is done by setting the MINVALUE, MAXVALUE and
START values of the sequence.
With this change, we start to delete placement of reference tables at given worker node
after master_remove_node UDF call. We remove placement metadata at master node but we do
not drop actual shard from the worker node. There are two reasons for that decision,
first, it is not critical to DROP the shards in the workers because Citus will ignore them
as long as node is removed from cluster and if we add that node back to cluster we will
DROP and recreate all reference tables. Second, if node is unreachable, it becomes
complicated to cover failure cases and have a transaction support.
Enables use views within distributed queries.
User can create and use a view on distributed tables/queries
as he/she would use with regular queries.
After this change router queries will have full support for views,
insert into select queries will support reading from views, not
writing into. Outer joins would have a limited support, and would
error out at certain cases such as when a view is in the inner side
of the outer join.
Although PostgreSQL supports writing into views under certain circumstances.
We disallowed that for distributed views.
So far we've reloaded them frequently. Besides avoiding that cost -
noticeable for some workloads with large shard counts - it makes it
easier to add information to ShardPlacements that help us make
placement_connection.c colocation aware.
Doing so requires adding a mapping from shardId to the cache
entries. For that metadata_cache.c now maintains an additional
hashtable. That hashtable only references shard intervals in the
dist table cache.
Previously the function was getting too large. Thus this splits the
function into separate parts for looking up the cache entry and
building the cache contents.
With this change, we start to replicate all reference tables to the new node when new node
is added to the cluster with master_add_node command. We also update replication factor
of reference table's colocation group.
With this change we introduce new UDF, upgrade_to_reference_table, which can be used to
upgrade existing broadcast tables reference tables. For upgrading, we require that given
table contains only one shard.
Renamed FindShardIntervalIndex() to ShardIndex() and added binary search
capability. It used to assume that hash partition tables are always
uniformly distributed which is not true if upcoming tenant isolation
feature is applied. This commit also reduces code duplication.
* Add get_distribution_value_shardid UDF
With this UDF users can now map given distribution value to shard id. We mostly hide
shardids from users to prevent unnecessary complexity but some power users might need
to know about which entry/value is stored in which shard for maintanence purposes.
Signature of this UDF is as follows;
bigint get_distribution_value_shardid(table_name regclass, distribution_value anyelement)
With this commit, we implemented some basic features of reference tables.
To start with, a reference table is
* a distributed table whithout a distribution column defined on it
* the distributed table is single sharded
* and the shard is replicated to all nodes
Reference tables follows the same code-path with a single sharded
tables. Thus, broadcast JOINs are applicable to reference tables.
But, since the table is replicated to all nodes, table fetching is
not required any more.
Reference tables support the uniqueness constraints for any column.
Reference tables can be used in INSERT INTO .. SELECT queries with
the following rules:
* If a reference table is in the SELECT part of the query, it is
safe join with another reference table and/or hash partitioned
tables.
* If a reference table is in the INSERT part of the query, all
other participating tables should be reference tables.
Reference tables follow the regular co-location structure. Since
all reference tables are single sharded and replicated to all nodes,
they are always co-located with each other.
Queries involving only reference tables always follows router planner
and executor.
Reference tables can have composite typed columns and there is no need
to create/define the necessary support functions.
All modification queries, master_* UDFs, EXPLAIN, DDLs, TRUNCATE,
sequences, transactions, COPY, schema support works on reference
tables as expected. Plus, all the pre-requisites associated with
distribution columns are dismissed.
This includes basic infrastructure for logging of commands sent to
remote/worker nodes. Note that this has no effect as of yet, since no
callers are converted to the new infrastructure.
Connections are tracked and released by integrating into postgres'
transaction handling. That allows to to use connections without having
to resort to having to disable interrupts or using PG_TRY/CATCH blocks
to avoid leaking connections.
This is intended to eventually replace multi_client_executor.c and
connection_cache.c, and to provide the basis of a centralized
transaction management.
The newly introduced transaction hook should, in the future, be the only
one in citus, to allow for proper ordering between operations. For now
this central handler is responsible for releasing connections and
resetting XactModificationLevel after a transaction.
In ErrorIfShardPlacementsNotColocated(), while checking if shards are colocated,
error out if matching shard intervals have different number of shard placements.
Added a new UDF, mark_tables_colocated(), to colocate tables with the same
configuration (shard count, shard replication count and distribution column type).
This forces prepared statements to be re-planned after changes of the
placement metadata. There's some locking issues remaining, but that's a
a separate task.
Also add regression tests verifying that invalidations take effect on
prepared statements.
This commit adds INSERT INTO ... SELECT feature for distributed tables.
We implement INSERT INTO ... SELECT by pushing down the SELECT to
each shard. To compute that we use the router planner, by adding
an "uninstantiated" constraint that the partition column be equal to a
certain value. standard_planner() distributes that constraint to all
the tables where it knows how to push the restriction safely. An example
is that the tables that are connected via equi joins.
The router planner then iterates over the target table's shards,
for each we replace the "uninstantiated" restriction, with one that
PruneShardList() handles. Do so by replacing the partitioning qual
parameter added in multi_planner() with the current shard's
actual boundary values. Also, add the current shard's boundary values to the
top level subquery to ensure that even if the partitioning qual is
not distributed to all the tables, we never run the queries on the shards
that don't match with the current shard boundaries. Finally, perform the
normal shard pruning to decide on whether to push the query to the
current shard or not.
We do not support certain SQLs on the subquery, which are described/commented
on ErrorIfInsertSelectQueryNotSupported().
We also added some locking on the router executor. When an INSERT/SELECT command
runs on a distributed table with replication factor >1, we need to ensure that
it sees the same result on each placement of a shard. So we added the ability
such that router executor takes exclusive locks on shards from which the SELECT
in an INSERT/SELECT reads in order to prevent concurrent changes. This is not a
very optimal solution, but it's simple and correct. The
citus.all_modifications_commutative can be used to avoid aggressive locking.
An INSERT/SELECT whose filters are known to exclude any ongoing writes can be
marked as commutative. See RequiresConsistentSnapshot() for the details.
We also moved the decison of whether the multiPlan should be executed on
the router executor or not to the planning phase. This allowed us to
integrate multi task router executor tasks to the router executor smoothly.
Between restart (running the new code) and ALTER EXTENSION citus
UPGRADE there was an inconsistency where we assumed that
pg_dist_partition had the repmodel column set. Now we give it a default
value if the column doesn't exist yet.
With this change, we now push down foreign key constraints created during CREATE TABLE
statements. We also start to send foreign constraints during shard move along with
other DDL statements
Adds support for PostgreSQL 9.6 by copying in the requisite ruleutils
file and refactoring the out/readfuncs code to flexibly support the
old-style copy/pasted out/readfuncs (prior to 9.6) or use extensible
node APIs (in 9.6 and higher).
Most version-specific code within this change is only needed to set new
fields in the AggRef nodes we build for aggregations. Version-specific
test output files were added in certain cases, though in most they were
not necessary. Each such file begins by e.g. printing the major version
in order to clarify its purpose.
The comment atop citus_nodes.h details how to add support for new nodes
for when that becomes necessary.
This commit completes having support in Citus by adding having support for
real-time and task-tracker executors. Multiple tests are added to regression
tests to cover new supported queries with having support.
This change adds the required infrastructure about metadata snapshot from MX
codebase into Citus, mainly metadata_sync.c file and master_metadata_snapshot UDF.
So far placements were assigned an Oid, but that was just used to track
insertion order. It also did so incompletely, as it was not preserved
across changes of the shard state. The behaviour around oid wraparound
was also not entirely as intended.
The newly introduced, explicitly assigned, IDs are preserved across
shard-state changes.
The prime goal of this change is not to improve ordering of task
assignment policies, but to make it easier to reference shards. The
newly introduced UpdateShardPlacementState() makes use of that, and so
will the in-progress connection and transaction management changes.
Related to #786
This change adds the `pg_dist_node` table that contains the information
about the workers in the cluster, replacing the previously used
`pg_worker_list.conf` file (or the one specified with `citus.worker_list_file`).
Upon update, `pg_worker_list.conf` file is read and `pg_dist_node` table is
populated with the file's content. After that, `pg_worker_list.conf` file
is renamed to `pg_worker_list.conf.obsolete`
For adding and removing nodes, the change also includes two new UDFs:
`master_add_node` and `master_remove_node`, which require superuser
permissions.
'citus.worker_list_file' guc is kept for update purposes but not used after the
update is finished.
An interaction between ReraiseRemoteError and DML transaction support
causes segfaults:
* ReraiseRemoteError calls PurgeConnection, freeing a connection...
* That connection is still in the xactParticipantHash
At transaction end, the memory in the freed connection might happen to
pass the "is this connection OK?" check, causing us to try to send an
ABORT over that connection. By removing it from the transaction hash
before calling ReraiseRemoteError, we avoid this possibility.
Three changes here to get to true multi-statement, multi-relation DDL
transactions (same functionality pre-5.2, with benefits of atomicity):
1. Changed the multi-shard utility hook to always run (consistency
with router executor hook, removes ad-hoc "installed" boolean)
2. Change the global connection list in multi_shard_transaction to
instead be a hash; update related functions to operate on global
hash instead of local hash/global list
3. Remove check within DDL code to prevent subsequent DDL commands;
place unset/reset guard around call to ConnectToNode to permit
connecting to additional nodes after DDL transaction has begun
In addition, code has been added to raise an error if a ROLLBACK TO
SAVEPOINT is attempted (similar to router executor), and comprehensive
tests execute all multi-DDL scenarios (full success, user ROLLBACK, any
actual errors (say, duplicate index), partial failure (duplicate index
on one node but not others), partial COMMIT (one node fails), and 2PC
partial PREPARE (one node fails)). Interleavings with other commands
(DML, \copy) are similarly all covered.
To permit use with ZomboDB (https://github.com/zombodb/zombodb), two
changes were necessary:
1. Permit use of `tableoid` system column in queries
2. Extend relation names appearing in index expressions
The first is accomplished by simply changing the deparse logic to allow
system columns in queries destined for distributed tables. The latter
was slightly more complex, given that DDL extension currently occurs on
workers. But since indexes cannot reference tables other than the one
being indexed, it is safe to look for any relation reference ending in
a '*' character and extend their penultimate segments with a shard id.
This change also adds an error to prevent users from distributing any
relations using the WITH (OIDS) feature, which is unsupported.
Recent changes to DDL and transaction logic resulted in a "regression"
from the viewpoint of users. Previously, DDL commands were allowed in
multi-command transaction blocks, though they were not processed in any
actual transactional manner. We improved the atomicity of our DDL code,
but added a restriction that DDL commands themselves must not occur in
any BEGIN/END transaction block.
To give users back the original functionality (and improved atomicity)
we now keep track of whether a multi-command transaction has modified
data (DML) or schema (DDL). Interleaving the two modification types in
a single transaction is disallowed.
This first step simply permits a single DDL command in such a block,
admittedly an incomplete solution, but one which will permit us to add
full multi-DDL command support in a subsequent commit.
Text datums can't be directly accessed via the struct equivalence trick
used to access catalogs. That's because, as an optimization, they're
sometimes aligned to 1 byte ("text"'s alignment), and sometimes to 4
bytes. That depends on it being a short
varlena (cf. VARATT_NOT_PAD_BYTE) or not.
In the case at hand here, partkey became longer than 127 characters -
the boundary for short varlenas (cf. VARATT_CAN_MAKE_SHORT()). Thus it
became 4 byte/int aligned. Which lead to the direct struct access
accessing the wrong data.
The fix is simply to never access partkey that way - to enforce that,
hide partkey ehind the usual ifdef.
Fixes: #674
This adds support for SERIAL/BIGSERIAL column types. Because we now can
evaluate functions on the master (during execution), adding this is a
matter of ensuring the table creation step works properly.
To accomplish this, I've added some logic to detect sequences owned by
a table (i.e. those related to its columns). Simply creating a sequence
and using it in a default value is insufficient; users who do so must
ensure the sequence is owned by the column using it.
Fortunately, this is exactly what SERIAL and BIGSERIAL do, which is the
use case we're targeting with this feature. While testing this, I found
that worker_apply_shard_ddl_command actually adds shard identifiers to
sequence names, though I found no places that use or test this path. I
removed that code so that sequence names are not mutated and will match
those used by a SERIAL default value expression.
Our use of the new-to-9.5 CREATE SEQUENCE IF NOT EXISTS syntax means we
are dropping support for 9.4 (which is being done regardless, but makes
this change simpler). I've removed 9.4 from the Travis build matrix.
Some edge cases are possible in ALTER SEQUENCE, COPY FROM (on workers),
and CREATE SEQUENCE OWNED BY. I've added errors for each so that users
understand when and why certain operations are prohibited.
Allows the use of modification commands (INSERT/UPDATE/DELETE) within
transaction blocks (delimited by BEGIN and ROLLBACK/COMMIT), so long as
all modifications hit a subset of nodes involved in the first such com-
mand in the transaction. This does not circumvent the requirement that
each individual modification command must still target a single shard.
For instance, after sending BEGIN, a user might INSERT some rows to a
shard replicated on two nodes. Subsequent modifications can hit other
shards, so long as they are on one or both of these nodes.
SAVEPOINTs are supported, though if the user actually attempts to send
a ROLLBACK command that specifies a SAVEPOINT they will receive an
ERROR at the end of the topmost transaction.
Placements are only marked inactive if at least one replica succeeds
in a transaction where others fail. Non-atomic behavior is possible if
the shard targeted by the initial modification within a transaction has
a higher replication factor than another shard within the same block
and a node with the latter shard has a failure during the COMMIT phase.
Other methods of denoting transaction blocks (multi-statement commands
sent all at once and functions written in e.g. PL/pgSQL or other such
languages) are not presently supported; their treatment remains the
same as before.
Fixes#555
Before this change, we were resolving HLL function and type Oid without qualified name.
Now we find the schema name where HLL objects are stored and generate qualified names for
each objects.
Similar fix is also applied for cstore_table_size function call.
Fixes#215Fixes#267Fixes#502Fixes#556Fixes#557Fixes#560Fixes#568Fixes#623Fixes#624
With this change we schema-prefix table names, operator names and composite types.
Fixes#513
This change modifies the DDL Propagation logic so that DDL queries
are propagated via 2-Phase Commit protocol. This way, failures during
the execution of distributed DDL commands will not leave the table in
an intermediate state and the pending prepared transactions can be
commited manually.
DDL commands are not allowed inside other transaction blocks or functions.
DDL commands are performed with 2PC regardless of the value of
`citus.multi_shard_commit_protocol` parameter.
The workflow of the successful case is this:
1. Open individual connections to all shard placements and send `BEGIN`
2. Send `SELECT worker_apply_shard_ddl_command(<shardId>, <DDL Command>)`
to all connections, one by one, in a serial manner.
3. Send `PREPARE TRANSCATION <transaction_id>` to all connections.
4. Sedn `COMMIT` to all connections.
Failure cases:
- If a worker problem occurs before sending of all DDL commands is finished, then
all changes are rolled back.
- If a worker problem occurs after all DDL commands are sent but not after
`PREPARE TRANSACTION` commands are finished, then all changes are rolled back.
However, if a worker node is failed, then the prepared transactions in that worker
should be rolled back manually.
- If a worker problem occurs during `COMMIT PREPARED` statements are being sent,
then the prepared transactions on the failed workers should be commited manually.
- If master fails before the first 'PREPARE TRANSACTION' is sent, then nothing is
changed on workers.
- If master fails during `PREPARE TRANSACTION` commands are being sent, then the
prepared transactions on workers should be rolled back manually.
- If master fails during `COMMIT PREPARED` or `ROLLBACK PREPARED` commands are being
sent, then the remaining prepared transactions on the workers should be handled manually.
This change also helps with #480, since failed DDL changes no longer mark
failed placements as inactive.
- Enables using VOLATILE functions (like nextval()) in INSERT queries
- Enables using STABLE functions (like now()) targetLists and joinTrees
UPDATE and INSERT can now contain non-immutable functions. INSERT can contain any kind of
expression, while UPDATE can contain any STABLE function, so long as a Var is not passed
into the STABLE function, even indirectly. UPDATE TagetEntry's can now also include Vars.
There's an exception, CASE/COALESCE statements may not contain mutable functions.
Functions calls in master_modify_multiple_shards are also evaluated.
The only way we re-raise an error is if the raiseError flag is true, so
might as well purge connection in that block rather than independently
checking errorLevel.
There's not a ton of documentation about what CONTEXT lines should look
like, but this seems like the most dominant pattern. Similarly, users
should expect lowercase, non-period strings.
Fixes#10
This change creates a new UDF: master_modify_multiple_shards
Parameters:
modify_query: A simple DELETE or UPDATE query as a string.
The UDF is similar to the existing master_apply_delete_command UDF.
Basically, given the modify query, it prunes the shard list, re-constructs
the query for each shard and sends the query to the placements.
Depending on the value of citus.multi_shard_commit_protocol, the commit
can be done in one-phase or two-phase manner.
Limitations:
* It cannot be called inside a transaction block
* It only be called with simple operator expressions (like Single Shard Modify)
Sample Usage:
```
SELECT master_modify_multiple_shards(
'DELETE FROM customer_delete_protocol WHERE c_custkey > 500 AND c_custkey < 500');
```
This change renames the distributed transaction manager parameter from
citus.copy_transaction_manager to citus.multi_shard_commit_protocol.
Distributed transaction manager has been used only by the COPY on hash
partitioned tables but it can be used by upcoming features so, we needed
to rename so that its name do not contain a reference to COPY.
The change also includes renames like transaction_manager_options to
commit_protocol_options and TRANSACTION_MANAGER_1PC to COMMIT_PROTOCOL_1PC.
With this change, declaration of MultiShardCommitProtocol (was
CopyTransactionManager) is moved from multi_copy.c to multi_transaction.c.
Previously several commands, amongst them commands like
master_create_distributed_table(), were allowed for everyone. That's not
good: Even though citus currently requires superuser permissions, we
shouldn't allow non-superusers to perform actions as sensitive as making
a table distributed.
There's no checks on the worker_* functions, as these usually just punt
the action to underlying postgres functionality, which then perform the
necessary checks.
So far we've always used libpq defaults when connecting to workers; bar
special environment variables being set that'll always be the user that
started the server. That's not desirable because it prevents using
users with fewer privileges.
Thus change the various APIs creating connections to workers to always
use usernames. That means:
1) MultiClientConnect() needs to, optionally, accept a username
2) GetOrEstablishConnection(), including the underlying cache, need to
use the current user as part of the connection cache key. That way
connections for separate users are distinct, and we always use one
with the correct authorization.
3) The task tracker needs to keep track of the username associated with
a task, so it can use it when establishing connections outside the
originating session.
This commit adds a fast shard pruning path for INSERTs on
hash-partitioned tables. The rationale behind this change is
that if there exists a sorted shard interval array, a single
index lookup on the array allows us to find the corresponding
shard interval. As mentioned above, we need a sorted
(wrt shardminvalue) shard interval array. Thus, this commit
updates shardIntervalArray to sortedShardIntervalArray in the
metadata cache. Then uses the low-level API that is defined in
multi_copy to handle the fast shard pruning.
The performance impact of this change is more apparent as more
shards exist for a distributed table. Previous implementation
was relying on linear search through the shard intervals. However,
this commit relies on constant lookup time on shard interval
array. Thus, the shard pruning becomes less dependent on the
shard count.
When we notice that pg_dist_partition is being invalidated we assume
that the citus extension is being dropped and drop state such as
extensionLoaded and the cached oids of all the metadata tables.
This frees the user from needing to reconnect after running DROP
EXTENSION, so we also no longer send a warning message.
Prior to this change, performing a SELECT query without a target
list caused backend to crash.
Sample Query: SELECT FROM github_events; (without any * before FROM)
PostgreSQL:
```
--
(39599 rows)
```
Citus:
```
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!>
```
The problem was an unnecessary Assert on column list in
SetRangeTblExtraData(citus_nodefuncs.c)
Though Citus' Task struct has a shardId field, it doesn't have the same
semantics as the one previously used in pg_shard code. The analogous
field in the Citus Task is anchorShardId. I've also added an argument
check to the relevant locking function to catch future locking attempts
which pass an invalid argument.
All citusdb references in
- extension, binary names
- file headers
- all configuration name prefixes
- error/warning messages
- some functions names
- regression tests
are changed to be citus.
The postgres_fdw extension has an extern function with an identical
signature, which can cause problems when both extensions are loaded.
A simple rename can fix this for now (this is the only function with)
such a conflict.