Written byVladimir Ozerov
June 6, 2023
Most optimizations in analytical engines follow the same principle — provide an answer to the query, spending as little resources as possible, be it CPU, RAM, disk, network, or whatever. Dynamic filtering is an important optimization that creates in runtime an additional predicate on one side of the join based on data from the other. As analytical queries often contain multiple Joins and scan large tables, this optimization allows for reducing the amount of data being scanned and significantly improves performance. This blog post discusses the core idea of dynamic filtering and how it is implemented in Trino, a massively parallel distributed SQL engine for big data.
Analytical queries often join a large fact table with one or more dimension tables. The following query return sales for one month grouped by day:
Notice that there are no predicates on the fact table `store_sales`. Instead, we have a relatively selective predicate on a dimension table `date_dim` and then join it with the fact table using a surrogate key. If we execute this query as is, the engine will most likely have to scan the whole `store_sales`, and most scanned data will be discarded as there will be no matching records on the right side of the Join.
Can we do something more clever than a full scan? Yes, and this is when dynamic filtering comes into play.
The main idea is dead simple — try deducing new predicates in runtime and use them to filter data in the query plan pipeline as early as possible. In most cases, dynamic filtering is concerned with finding new predicates from the equijoin condition of a Join operator and try pushing them to upstream Scan operators. This allows the engine to find better access paths for certain scans.
The predicate from our query returns thirty records from the `date_dim` table on the right side of the join, one record for each day in June 1998:
We also have an equijoin condition `ss_sold_date_sk = d_date_sk`. If we somehow convert these thirty values into a predicate on the `ss_sold_date_sk` attribute, we can try using it to scan only part of the fact table.
For instance, in OLTP systems, there could be a sorted index on the`s_sold_date_sk` attribute. Many production-grade optimizers can use indexes for `IN` predicates (see our blog post about searchable arguments), so the table scan could be replaced with index lookup.
In analytical systems, be it a data warehouse or a data lake, fact tables are often partitioned by some attribute. In this case, the additional predicate could allow for various optimizations, such as partition pruning.
While dynamic filtering is usually used for table scan optimization, it can also improve the performance of Join operators. For example, suppose in the given engine, the computational complexity of predicate evaluation is lower than Join record matching (think of a hash join that does a hash table lookup for each record from the left side). In that case, we can apply the predicate before passing records to the Join operator. However, the benefit of this approach is usually less impressive than full scan elimination.
We consider two query processing phases — planning and execution.
During planning, Trino determines where to place the dynamic filtering operators in the plan. The source of dynamic filters is Join operators with equijoin conditions. For such operators, it is possible to get concrete values of one part of the equijoin condition on the right side and transfer them to the left. Not all equijoins could be used for dynamic filtering. For example, we cannot use dynamic filtering for `LEFT OUTER` and `FULL OUTER` joins because all records from the left side must be returned at least once, and additional predicate may render incorrect results. There are some additional restrictions in the Trino engine that we omit for brevity.
Example of a query with two Join operators eligible for dynamic filtering optimization:
Trino planner applies several transformations related to dynamic filtering: PredicatePushDown, RemoveUnsupportedDynamicFilters, and AddDynamicFilterSource. From the bird's eye view, these transformations do the following:
Query plan before optimization:
Query plan after optimization:
Dynamic filters could be either local or distributed. Trino uses local dynamic filters when both producer and consumer are in the same query fragment. This happens when both sides of the Join are partitioned on the respective equijoin attributes.
Otherwise, Trino uses distributed dynamic filters. In this case, the coordinator node first registers all the dynamic filters for the query. Then instances of the `DynamicFilterSource` operator on worker nodes create partial predicates as data passes through them. Worker nodes send collected partial predicates to the coordinator, where the final predicate is created. Finally, the coordinator sends the combined predicated back to workers, where it is used in table scan operators.
A filter is a set of values observed when processing rows from the right side of the Join. It could be concrete values, ranges, or both. See Domain and TupleDomain classes for more detail. For example, the filter could contain several concrete values, such as `a := (1, 3, 100)`. If there are too many distinct values, it could negatively impact both memory consumption and predicate evaluation speed. In this case, Trino can optionally compact distinct values into more coarse-grained ranges, e.g., `a := [1:100]`.
Table scan in Trino consists of two steps:
When Trino emits splits for processing, dynamic filters could help prune some splits that do not contain data. Suppose our table `fact` is a set of Apache Parquet files partitioned by the attribute `a` and our dynamic filter is `a := (1, 3)`. In that case, we can scan only files related to partitions `a=1` and `a=3` and ignore the rest. Dynamic filter construction may take some time as the right-hand side of the Join is processed. Whether we want to wait for the dynamic filter construction before starting the scan or it is better to start scanning right away is a trade-off. In Trino, you can control the maximum scan delay if needed.
In some cases, we cannot use dynamic filters for split pruning. For example, if our Apache Parquet table is partitioned by `a` and we constructed a dynamic filter on `b`, we cannot prune any split. Still, dynamic filters could help us scan individual splits more efficiently. For example, we can use dynamic filters to prune certain Apache Parquet row groups, comparing row group stats to filter values.
In Trino, you can enable and disable dynamic filtering optimization via the config property `enable-dynamic-filtering` or the session property `enable_dynamic_filtering`. Some connectors provide additional properties to delay split processing until the dynamic filter is ready. In the Hive connector, which we use to query files in data lakes, the respective config and session properties are `hive.dynamic-filtering.wait-timeout` and `<catalog_name>.dynamic_filtering_wait_timeout`.
Consider our original query executed against the TPC-DS schema, scale factor 1000. Our data resides in S3-compatible storage in Apache Parquet format, and the `store_sales` table is partitioned by the `ss_sold_date_sk` attribute.
Without dynamic filtering, we have to scan the whole fact table containing `2 879 987 999` records (~ 24 Gb compressed data) which take 10 seconds on our test cluster.
With dynamic filtering, we create an additional predicate on the `ss_sold_date_sk` attribute, which helps us prune almost all partitions. In this case we read only `26 090 827` records (~250 Mb) which takes only 2.5 seconds. That is, dynamic filtering gave us a 4x improvement in speed and a 100x in the amount of scanned data. Impressive!
Dynamic filtering is a powerful optimization that improves performance by creating additional runtime filters on one side of the Join based on data from the other. If the engine finds a way to use the filter during table scan, it reduces the amount of data being scanned and improves query performance. Trino is one of many analytical systems that supports dynamic filtering for a broad set of queries. In future posts, we will discuss how the join order planning is implemented in Trino. Stay tuned!
We are always ready to help you with query engine design or Trino integration. Just let us know.