Dynamic Filtering: a Critical Performance Optimization in Analytical Engines

Written by

Vladimir Ozerov

June 6, 2023

Dynamic Filtering: a Critical Performance Optimization in Analytical Engines

Most optimizations in analytical engines follow the same principle — provide an answer to the query, spending as little resources as possible, be it CPU, RAM, disk, network, or whatever. Dynamic filtering is an important optimization that creates in runtime an additional predicate on one side of the join based on data from the other. As analytical queries often contain multiple Joins and scan large tables, this optimization allows for reducing the amount of data being scanned and significantly improves performance. This blog post discusses the core idea of dynamic filtering and how it is implemented in Trino, a massively parallel distributed SQL engine for big data.

Motivation

Analytical queries often join a large fact table with one or more dimension tables. The following query return sales for one month grouped by day:


SELECT d_date, SUM(ss_net_paid)
FROM store_sales JOIN date_dim 
  ON ss_sold_date_sk = d_date_sk
WHERE d_year = 1998 AND d_moy = 6
GROUP BY d_date

Notice that there are no predicates on the fact table `store_sales`. Instead, we have a relatively selective predicate on a dimension table `date_dim` and then join it with the fact table using a surrogate key. If we execute this query as is, the engine will most likely have to scan the whole `store_sales`, and most scanned data will be discarded as there will be no matching records on the right side of the Join.

Can we do something more clever than a full scan? Yes, and this is when dynamic filtering comes into play.

Dynamic Filtering

The main idea is dead simple — try deducing new predicates in runtime and use them to filter data in the query plan pipeline as early as possible. In most cases, dynamic filtering is concerned with finding new predicates from the equijoin condition of a Join operator and try pushing them to upstream Scan operators. This allows the engine to find better access paths for certain scans.

The predicate from our query returns thirty records from the `date_dim` table on the right side of the join, one record for each day in June 1998:


d_year=1998, d_moy=6, d_date_sk=2415051 // 1 Jun 1998
d_year=1998, d_moy=6, d_date_sk=2415052 // 2 Jun 1998
...
d_year=1998, d_moy=6, d_date_sk=2415080 // 30 Jun 1998

We also have an equijoin condition `ss_sold_date_sk = d_date_sk`. If we somehow convert these thirty values into a predicate on the `ss_sold_date_sk` attribute, we can try using it to scan only part of the fact table.


s_sold_date_sk IN (2415051, 2415052, ..., 2415080)

For instance, in OLTP systems, there could be a sorted index on the`s_sold_date_sk` attribute. Many production-grade optimizers can use indexes for `IN` predicates (see our blog post about searchable arguments), so the table scan could be replaced with index lookup.

In analytical systems, be it a data warehouse or a data lake, fact tables are often partitioned by some attribute. In this case, the additional predicate could allow for various optimizations, such as partition pruning.

Fig. 1 — Scan optimization with dynamic filtering

While dynamic filtering is usually used for table scan optimization, it can also improve the performance of Join operators. For example, suppose in the given engine, the computational complexity of predicate evaluation is lower than Join record matching (think of a hash join that does a hash table lookup for each record from the left side). In that case, we can apply the predicate before passing records to the Join operator. However, the benefit of this approach is usually less impressive than full scan elimination.

Fig. 2 — Scan and Join optimization with dynamic filtering

Implementation in Trino

We consider two query processing phases — planning and execution.

During planning, Trino determines where to place the dynamic filtering operators in the plan. The source of dynamic filters is Join operators with equijoin conditions. For such operators, it is possible to get concrete values of one part of the equijoin condition on the right side and transfer them to the left. Not all equijoins could be used for dynamic filtering. For example, we cannot use dynamic filtering for `LEFT OUTER` and `FULL OUTER` joins because all records from the left side must be returned at least once, and additional predicate may render incorrect results. There are some additional restrictions in the Trino engine that we omit for brevity.

Example of a query with two Join operators eligible for dynamic filtering optimization:


SELECT * FROM fact 
  INNER JOIN dim1 ON fact.a = dim1.id
  INNER JOIN dim2 ON fact.b = dim2.id

Trino planner applies several transformations related to dynamic filtering: PredicatePushDown, RemoveUnsupportedDynamicFilters, and AddDynamicFilterSource. From the bird's eye view, these transformations do the following:

  1. Traverse the plan top-down.
  2. When a Join operator is observed, Trino creates two additional operators: `DynamicFilterSource`, which constructs the dynamic predicate in runtime on the right, and `Filter`, which is applied on the left.
  3. Optimizer tries to push down the newly created `Filter` as much as possible to place it on top of the `TableScan` operator. During execution, Trino merges adjacent `Filter` and `TableScan` into a single operator allowing for a more optimal scan strategy.

Query plan before optimization:

Fig. 3 — Query plan before dynamic filter placement

Query plan after optimization:

Fig. 4 — Query plan after dynamic filter placement

Dynamic filters could be either local or distributed. Trino uses local dynamic filters when both producer and consumer are in the same query fragment. This happens when both sides of the Join are partitioned on the respective equijoin attributes.

Otherwise, Trino uses distributed dynamic filters. In this case, the coordinator node first registers all the dynamic filters for the query. Then instances of the `DynamicFilterSource` operator on worker nodes create partial predicates as data passes through them. Worker nodes send collected partial predicates to the coordinator, where the final predicate is created. Finally, the coordinator sends the combined predicated back to workers, where it is used in table scan operators.

Fig. 5 — Coordinator and worker nodes interaction when constructing a distributed dynamic filter

A filter is a set of values observed when processing rows from the right side of the Join. It could be concrete values, ranges, or both. See Domain and TupleDomain classes for more detail. For example, the filter could contain several concrete values, such as `a := (1, 3, 100)`. If there are too many distinct values, it could negatively impact both memory consumption and predicate evaluation speed. In this case, Trino can optionally compact distinct values into more coarse-grained ranges, e.g., `a := [1:100]`.

Table scan in Trino consists of two steps:

  1. The coordinator determines parts of the table, called splits, that could be scanned independently. For example, when the table is a set of Apache Parquet files in a data lake, a single split could be a file, or it's part.
  2. The coordinator sends splits to workers where actual processing happens.

When Trino emits splits for processing, dynamic filters could help prune some splits that do not contain data. Suppose our table `fact` is a set of Apache Parquet files partitioned by the attribute `a` and our dynamic filter is `a := (1, 3)`. In that case, we can scan only files related to partitions `a=1` and `a=3` and ignore the rest. Dynamic filter construction may take some time as the right-hand side of the Join is processed. Whether we want to wait for the dynamic filter construction before starting the scan or it is better to start scanning right away is a trade-off. In Trino, you can control the maximum scan delay if needed.

In some cases, we cannot use dynamic filters for split pruning. For example, if our Apache Parquet table is partitioned by `a` and we constructed a dynamic filter on `b`, we cannot prune any split. Still, dynamic filters could help us scan individual splits more efficiently. For example, we can use dynamic filters to prune certain Apache Parquet row groups, comparing row group stats to filter values.

Fig. 6 — Partition and row group pruning in Apache Parquet files

In Trino, you can enable and disable dynamic filtering optimization via the config property `enable-dynamic-filtering` or the session property `enable_dynamic_filtering`. Some connectors provide additional properties to delay split processing until the dynamic filter is ready. In the Hive connector, which we use to query files in data lakes, the respective config and session properties are `hive.dynamic-filtering.wait-timeout` and `<catalog_name>.dynamic_filtering_wait_timeout`.

Example

Consider our original query executed against the TPC-DS schema, scale factor 1000. Our data resides in S3-compatible storage in Apache Parquet format, and the `store_sales` table is partitioned by the `ss_sold_date_sk` attribute.


SELECT d_date, SUM(ss_net_paid)
FROM store_sales JOIN date_dim 
  ON ss_sold_date_sk = d_date_sk
WHERE d_year = 1998 AND d_month = 6
GROUP BY d_date

Without dynamic filtering, we have to scan the whole fact table containing `2 879 987 999` records (~ 24 Gb compressed data) which take 10 seconds on our test cluster.

With dynamic filtering, we create an additional predicate on the `ss_sold_date_sk` attribute, which helps us prune almost all partitions. In this case we read only `26 090 827` records (~250 Mb) which takes only 2.5 seconds. That is, dynamic filtering gave us a 4x improvement in speed and a 100x in the amount of scanned data. Impressive!

Fig. 7 — Performance results with and without dynamic filtering

Summary

Dynamic filtering is a powerful optimization that improves performance by creating additional runtime filters on one side of the Join based on data from the other. If the engine finds a way to use the filter during table scan, it reduces the amount of data being scanned and improves query performance. Trino is one of many analytical systems that supports dynamic filtering for a broad set of queries. In future posts, we will discuss how the join order planning is implemented in Trino. Stay tuned!

We are always ready to help you with query engine design or Trino integration. Just let us know.