Before this commit, in certain cases router planner allowed pushing
down JOINs that are not on the partition keys.
With @anarazel's suggestion, we change the logic to use uninstantiated
parameter. Previously, the planner was traversing on the restriction
information and once it finds the parameter, it was replacing it with
the shard range. With this commit, instead of traversing the restrict
infos, the planner explicitly checks for the equivalence of the relation
partition key with the uninstantiated parameter. If finds an equivalence,
it adds the restrictions. In this way, we have more control over the
queries that are pushed down.
With this commit, we add the range table list of the original query to our
custom plan. Therefore, PostgreSQL can check relations in the original query
for access permissions and error out if the proper access is not granted.
Custom Scan is a node in the planned statement which helps external providers
to abstract data scan not just for foreign data wrappers but also for regular
relations so you can benefit your version of caching or hardware optimizations.
This sounds like only an abstraction on the data scan layer, but we can use it
as an abstraction for our distributed queries. The only thing we need to do is
to find distributable parts of the query, plan for them and replace them with
a Citus Custom Scan. Then, whenever PostgreSQL hits this custom scan node in
its Vulcano style execution, it will call our callback functions which run
distributed plan and provides tuples to the upper node as it scans a regular
relation. This means fewer code changes, fewer bugs and more supported features
for us!
First, in the distributed query planner phase, we create a Custom Scan which
wraps the distributed plan. For real-time and task-tracker executors, we add
this custom plan under the master query plan. For router executor, we directly
pass the custom plan because there is not any master query. Then, we simply let
the PostgreSQL executor run this plan. When it hits the custom scan node, we
call the related executor parts for distributed plan, fill the tuple store in
the custom scan and return results to PostgreSQL executor in Vulcano style,
a tuple per XXX_ExecScan() call.
* Modify planner to utilize Custom Scan node.
* Create different scan methods for different executors.
* Use native PostgreSQL Explain for master part of queries.
During later work the transaction debug output will change (as it will
in postgres 10), which makes it hard to see actual changes in the
INSERT ... SELECT ... test. Reduce to DEBUG2 after changing a debug
message to that log level.
All router, real-time, task-tracker plannable queries should now have
full prepared statement support (and even use router when possible),
unless they don't go through the custom plan interface (which
basically just affects LANGUAGE SQL (not plpgsql) functions).
This is achieved by forcing postgres' planner to always choose a
custom plan, by assigning very low costs to plans with bound
parameters (i.e. ones were the postgres planner replanned the query
upon EXECUTE with all parameter values provided), instead of the
generic one.
This requires some trickery, because for custom plans to work the
costs for a non-custom plan have to be known, which means we can't
error out when planning the generic plan. Instead we have to return a
"faux" plan, that'd trigger an error message if executed. But due to
the custom plan logic that plan will likely (unless called by an SQL
function, or because we can't support that query for some reason) not
be executed; instead the custom plan will be chosen.
So far router planner had encapsulated different functionality in
MultiRouterPlanCreate. Modifications always go through router, selects
sometimes. Modifications always error out if the query is unsupported,
selects return NULL. Especially the error handling is a problem for
the upcoming extension of prepared statement support.
Split MultiRouterPlanCreate into CreateRouterPlan and
CreateModifyPlan, and change them to not throw errors.
Instead errors are now reported by setting the new
MultiPlan->plannigError.
Callers of router planner functionality now have to throw errors
themselves if desired, but also can skip doing so.
This is a pre-requisite for expanding prepared statement support.
While touching all those lines, improve a number of error messages by
getting them closer to the postgres error message guidelines.
The name CreatePhysicalPlan() hasn't been accurate for a while, and
the split of work between multi_planner() and CreatePhysicalPlan()
doesn't seem perfect. So rename to CreateDistributedPlan() and move a
bit more logic in there.
This adds a replication_model GUC which is used as the replication
model for any new distributed table that is not a reference table.
With this change, tables with replication factor 1 are no longer
implicitly MX tables.
The GUC is similarly respected during empty shard creation for e.g.
existing append-partitioned tables. If the model is set to streaming
while replication factor is greater than one, table and shard creation
routines will error until this invalid combination is corrected.
Changing this parameter requires superuser permissions.
We changed error message which appears when user tries to execute outer join command and
that command requires repartitioning. Old error message mentioned about 1-to-1 shard
partitioning which may not be clear to user.
Instead use pg_plan_query() like the normal explain does, and use that
to explain the query. That's important because it allows to remove
the duplicated planner logic from multi_explain - and that logic is
about to get more complicated.
They make fixing explain for prepared statement harder, and they don't
really fit into EXPLAIN in the first place. Additionally they're
currently not exercised in any tests.
This commit is intended to improve the error messages while planning
INSERT INTO .. SELECT queries. The main motivation for this change is
that we used to map multiple cases into a single message. With this change,
we added explicit error messages for many cases.
Enables use views within distributed queries.
User can create and use a view on distributed tables/queries
as he/she would use with regular queries.
After this change router queries will have full support for views,
insert into select queries will support reading from views, not
writing into. Outer joins would have a limited support, and would
error out at certain cases such as when a view is in the inner side
of the outer join.
Although PostgreSQL supports writing into views under certain circumstances.
We disallowed that for distributed views.
With this change we start to error out on router planner queries where a common table
expression with data-modifying statement is present. We already do not support if
there is a data-modifying statement using result of the CTE, now we also error out
if CTE itself is data-modifying statement.
With this commit, we implemented some basic features of reference tables.
To start with, a reference table is
* a distributed table whithout a distribution column defined on it
* the distributed table is single sharded
* and the shard is replicated to all nodes
Reference tables follows the same code-path with a single sharded
tables. Thus, broadcast JOINs are applicable to reference tables.
But, since the table is replicated to all nodes, table fetching is
not required any more.
Reference tables support the uniqueness constraints for any column.
Reference tables can be used in INSERT INTO .. SELECT queries with
the following rules:
* If a reference table is in the SELECT part of the query, it is
safe join with another reference table and/or hash partitioned
tables.
* If a reference table is in the INSERT part of the query, all
other participating tables should be reference tables.
Reference tables follow the regular co-location structure. Since
all reference tables are single sharded and replicated to all nodes,
they are always co-located with each other.
Queries involving only reference tables always follows router planner
and executor.
Reference tables can have composite typed columns and there is no need
to create/define the necessary support functions.
All modification queries, master_* UDFs, EXPLAIN, DDLs, TRUNCATE,
sequences, transactions, COPY, schema support works on reference
tables as expected. Plus, all the pre-requisites associated with
distribution columns are dismissed.
We used to disable router planner and executor
when task executor is set to task-tracker.
This change enables router planning and execution
at all times regardless of task execution mode.
We are introducing a hidden flag enable_router_execution
to enable/disable router execution. Its default value is
true. User may disable router planning by setting it to false.
This commit fixes a bug when the SELECT target list includes a constant
value.
Previous behaviour of target list re-ordering:
* Iterate over the INSERT target list
* If it includes a Var, find the corresponding SELECT entry
and update its resno accordingly
* If it does not include a Var (which we only considered to be
DEFAULTs), generate a new SELECT target entry
* If the processed target entry count in SELECT target list is less
than the original SELECT target list (GROUP BY elements not included in
the SELECT target entry), add them in the SELECT target list and
update the resnos accordingly.
* However, this step was leading to add the CONST SELECT target entries
twice. The reason is that when CONST target list entries appear in the
SELECT target list, the INSERT target list doesn't include a Var. Instead,
it includes CONST as it does for DEFAULTs.
New behaviour of target list re-ordering:
* Iterate over the INSERT target list
* If it includes a Var, find the corresponding SELECT entry
and update its resno accordingly
* If it does not include a Var (which we consider to be
DEFAULTs and CONSTs on the SELECT), generate a new SELECT
target entry
* If any target entry remains on the SELECT target list which are resjunk,
(GROUP BY elements not included in the SELECT target entry), keep them
in the SELECT target list by updating the resnos.
Fixcitusdata/citus#886
The way postgres' explain hook is designed means that our hook is never
called during EXPLAIN EXECUTE. So, we special-case EXPLAIN EXECUTE by
catching it in the utility hook. We then replace the EXECUTE with the
original query and pass it back to Citus.
This commit adds INSERT INTO ... SELECT feature for distributed tables.
We implement INSERT INTO ... SELECT by pushing down the SELECT to
each shard. To compute that we use the router planner, by adding
an "uninstantiated" constraint that the partition column be equal to a
certain value. standard_planner() distributes that constraint to all
the tables where it knows how to push the restriction safely. An example
is that the tables that are connected via equi joins.
The router planner then iterates over the target table's shards,
for each we replace the "uninstantiated" restriction, with one that
PruneShardList() handles. Do so by replacing the partitioning qual
parameter added in multi_planner() with the current shard's
actual boundary values. Also, add the current shard's boundary values to the
top level subquery to ensure that even if the partitioning qual is
not distributed to all the tables, we never run the queries on the shards
that don't match with the current shard boundaries. Finally, perform the
normal shard pruning to decide on whether to push the query to the
current shard or not.
We do not support certain SQLs on the subquery, which are described/commented
on ErrorIfInsertSelectQueryNotSupported().
We also added some locking on the router executor. When an INSERT/SELECT command
runs on a distributed table with replication factor >1, we need to ensure that
it sees the same result on each placement of a shard. So we added the ability
such that router executor takes exclusive locks on shards from which the SELECT
in an INSERT/SELECT reads in order to prevent concurrent changes. This is not a
very optimal solution, but it's simple and correct. The
citus.all_modifications_commutative can be used to avoid aggressive locking.
An INSERT/SELECT whose filters are known to exclude any ongoing writes can be
marked as commutative. See RequiresConsistentSnapshot() for the details.
We also moved the decison of whether the multiPlan should be executed on
the router executor or not to the planning phase. This allowed us to
integrate multi task router executor tasks to the router executor smoothly.
The necessity for this functionality comes from the fact that ruleutils.c is not supposed to be
used on "rewritten" queries (i.e. ones that have been passed through QueryRewrite()).
Query rewriting is the process in which views and such are expanded,
and, INSERT/UPDATE targetlists are reordered to match the physical order,
defaults etc. For the details of reordeing, see transformInsertRow().
Adds support for PostgreSQL 9.6 by copying in the requisite ruleutils
file and refactoring the out/readfuncs code to flexibly support the
old-style copy/pasted out/readfuncs (prior to 9.6) or use extensible
node APIs (in 9.6 and higher).
Most version-specific code within this change is only needed to set new
fields in the AggRef nodes we build for aggregations. Version-specific
test output files were added in certain cases, though in most they were
not necessary. Each such file begins by e.g. printing the major version
in order to clarify its purpose.
The comment atop citus_nodes.h details how to add support for new nodes
for when that becomes necessary.
This commit completes having support in Citus by adding having support for
real-time and task-tracker executors. Multiple tests are added to regression
tests to cover new supported queries with having support.
So far placements were assigned an Oid, but that was just used to track
insertion order. It also did so incompletely, as it was not preserved
across changes of the shard state. The behaviour around oid wraparound
was also not entirely as intended.
The newly introduced, explicitly assigned, IDs are preserved across
shard-state changes.
The prime goal of this change is not to improve ordering of task
assignment policies, but to make it easier to reference shards. The
newly introduced UpdateShardPlacementState() makes use of that, and so
will the in-progress connection and transaction management changes.
Related to #786
This change adds the `pg_dist_node` table that contains the information
about the workers in the cluster, replacing the previously used
`pg_worker_list.conf` file (or the one specified with `citus.worker_list_file`).
Upon update, `pg_worker_list.conf` file is read and `pg_dist_node` table is
populated with the file's content. After that, `pg_worker_list.conf` file
is renamed to `pg_worker_list.conf.obsolete`
For adding and removing nodes, the change also includes two new UDFs:
`master_add_node` and `master_remove_node`, which require superuser
permissions.
'citus.worker_list_file' guc is kept for update purposes but not used after the
update is finished.
count_agg_clause *adds* the cost of the aggregates to the state
variable, it doesn't reinitialize it. That is intentional, as it is used
to incrementally add costs in some places.
is now a `::regtype` using the qualified name of the column type,
not the column type OID which may differ between master/worker nodes.
Test coverage of a hash reparitition using a UDT as the join column.
Note that the UDFs `worker_hash_partition_table` and `worker_range_partition_table`
are unchanged, and rightly expect an OID for the column type; but the
planner code building the commands now allows for `::regtype` casting
to do its magic.
Fixescitusdata/citus#111.
This commit enables to create different worker and master temporary folders.
This change is important for citus-mx on task-tracker execution. In simple words,
on citus-mx, the worker could actually be reponsible for the master tasks as well.
Prior to this change, both master and worker logic on task-tracker executor was
accessing and using the same files for different purposes which was dangerous on
certain cases (i.e., when task_tracker_delay is low).
Before this change, count on a distributed returned NULL if all shards
were pruned away, because on the master we replace with count(..) call
with a sum(..) call to sum the counts from the shards. However, sum
returns NULL when there are no rows, whereas count is expected to return
0.