Distinct aggregation optimization in Apache Calcite and Trino

Written by

Vladimir Ozerov

February 22, 2023

Distinct aggregation optimization in Apache Calcite and Trino

Abstract

Aggregation is one of the most frequently encountered operations in analytics. In SQL, aggregations are performed using aggregate functions (e.g., `SUM`, `COUNT`) with the optional `GROUP BY` clause. An aggregation function could contain the `DISTINCT` keyword, which might be non-trivial to implement in the query engine. This blog post explains how Apache Calcite and Trino optimizers rewrite distinct aggregates so that the underlying query engine can process them.

Background

Aggregation operation usually consists of an optional partitioning (grouping) step defined by the `GROUP BY` clause followed by a set of aggregation functions applied to individual partitions. The partitioning step is omitted if there is no `GROUP BY` clause, implying that the whole data set belongs to a single partition.

The `DISTINCT` keyword ensures that the aggregate function is applied only to a unique set of attribute values within the given partition. To find the unique attribute values in a partition (i.e., deduplicate), we have to further split the partition into sub-partitions by the distinct attribute.

Consider the following SQL query:


SELECT a, COUNT(DISTINCT b) 
FROM t 
GROUP BY a

To execute this query, we partition the table data by `[a]`. Then for each resulting partition, we further partition it by `[b]`. This way, we get the unique values of `[b]` for each unique value of `[a]`.

The query may contain multiple `DISTINCT` keywords on different attributes, requiring multiple sub-partitioning steps.

Now that we understand the problem let us discuss how distinct aggregations are processed in some state-of-the-art query engines. We will use Apache Calcite and Trino as examples.

Single Attribute

We start with a simple case when the `DISTINCT` keyword is applied to a single attribute. That is, there could be one or more distinct aggregations, but each such aggregation deduplicates the same attribute.

The query below matches this requirement. There are three aggregations that use attributes `[b]` and `[c]`, but only attribute `[b]` is used in distinct aggregations.


SELECT a, COUNT(DISTINCT b) agg1, AVG(DISTINCT b) agg2, AVG(c) agg3
FROM t 
GROUP BY a

To execute the query, we partition data by `[a]` and then by `[b]`. So let's do that and write the following query:


SELECT a, b 
FROM t
GROUP BY a, b

This query contains the grouping operation that finds unique `[a, b]` pairs. The original query requires grouping by `[a]`, so let's apply another `GROUP BY`  statement to get the desired groups:


SELECT a 
FROM (
  SELECT a, b 
  FROM t
  GROUP BY a, b
)
GROUP BY a

Now we have the inner query that produces unique pairs of `[a, b]` as required by the distinct aggregations, and we have the outer query that produces unique values of `[a]` as required by the `GROUP BY` clause.

Now let us add the aggregations. For the original distinct aggregations, we add them to the outer query, removing the `DISTINCT` keyword. This is safe because we already deduplicated `[a, b]` pairs, and hence the given partition `[a]` will never observe the exact value of `[b]` more than once.


SELECT a, COUNT(b) agg1, AVG(b) agg2 
FROM (
  SELECT a, b 
  FROM t
  GROUP BY a, b
)
GROUP BY a

What about the non-distinct aggregation on an otherwise unrelated column `[c]`? Since column `[c]` does participate in grouping in neither inner, nor outer query, we must provide a pair of aggregations that produce the same result as the original one. This transformation is often called aggregate splitting. Every aggregation has its own split strategy. Below are examples of split strategies for several common aggregation functions.


SUM(outer)   => SUM(SUM(inner))
COUNT(outer) => SUM(COUNT(inner))
AVG(outer)   => SUM(SUM(inner)) / SUM(COUNT(inner))
MIN(outer)   => MIN(MIN(inner))
MAX(outer)   => MAX(MAX(inner))

It is worth noting that aggregate splitting is routinely used in distributed and parallel query engines when a dedicated pre-aggregation step is desired to minimize the amount of data transferred between nodes or threads. The same split strategies are applied there.

Let's split our non-distinct aggregation to get the final query:


SELECT a, COUNT(b) agg1, AVG(b) agg2, SUM(tmp1) / SUM(tmp2) agg3
FROM (
  SELECT a, b, SUM(c) tmp1, COUNT(c) tmp2
  FROM t
  GROUP BY a, b
)
GROUP BY a

Adding one more grouping, we replaced distinct aggregation functions with non-distinct alternatives. Such a rewrite allows query engine developers to avoid implementing distinct aggregations at the engine level. This rewrite is also essential for distributed query engines because the two-aggregate strategy appears to be efficient in minimizing the required data transfers between nodes.

Note that the rewrite is applicable even if aggregation's attribute is derived (e.g., a function call) because every derived attribute could be modeled through a derived table:


# Before
SELECT a, COUNT(DISTINCT (b + c))
FROM t
GROUP BY a

# After
SELECT a, COUNT(DISTINCT tmp)
FROM (
  SELECT a, b + c AS tmp
  FROM t
)
GROUP BY a

Oh, wait! What if a query contains distinct aggregations on several attributes? Looks like our rewrite rule doesn't work anymore, so we need something more complicated.

Multiple Attributes

Let us rewrite our query so that now the `DISTINCT` keyword is applied to different attributes. We also replace `AVG` with `COUNT` to simplify the example (as `AVG` requires more complex splitting).


SELECT a, COUNT(DISTINCT b) agg1, COUNT(DISTINCT c) agg2, COUNT(d) agg3
FROM t 
GROUP BY a

The core problem is that now once we partitioned data by `[a]`, we need to partition it by `[b]` and by `[c]` separately. That is, we need `[a]`, `[a, b]`, and `[a, c]` partitioning all as a part of the same operation. Luckily, there are several ways to do this. We now take a look at Apache Calcite and Trino implementations as they use very different strategies.

Apache Calcite

We have two distinct aggregations on two different attributes, but we only know how to rewrite distinct aggregations on a single attribute. So let us cut the target query into two independent queries so that each query contains distinct aggregations on a single attribute, leaving the rest of the query logic unchanged.


# Query 1
SELECT a, COUNT(DISTINCT b) agg1, COUNT(d) agg3 
FROM t 
GROUP BY a

# Query 2
SELECT a, COUNT(DISTINCT c) agg2, COUNT(d) agg3 
FROM t 
GROUP BY a

Let's get rid of the `DISTINCT` keyword in both queries using the rewrite strategy from the previous paragraph:


# Query 1
SELECT a, COUNT(b) agg1, COUNT(tmp) agg3
FROM (
  SELECT a, b, SUM(d) tmp
  FROM t
  GROUP BY a, b
) 
GROUP BY a

# Query 2
SELECT a, COUNT(c) agg2, COUNT(tmp) agg3
FROM (
  SELECT a, c, SUM(d) tmp
  FROM t
  GROUP BY a, c
) 
GROUP BY a

We note that each query will produce the same number of tuples with precisely the same values of `[a]` since we group by `[a]`. Therefore, we can join two queries using `[a]` as the join key to get the final result! Note that we omit the repetitive calculation of `agg3` from the second query since it is unnecessary.


WITH q1 AS (
  SELECT a, COUNT(b) agg1, COUNT(tmp) agg3
  FROM (
    SELECT a, b, SUM(d) tmp
    FROM t
    GROUP BY a, b
  ) 
  GROUP BY a
),
q2 AS (
  SELECT a, COUNT(c) agg2
  FROM (
    SELECT a, c
    FROM t
    GROUP BY a, c
  ) 
  GROUP BY a
)
SELECT q1.a, q1.agg1, q2.agg2, q1.agg3
FROM q1 INNER JOIN q2 ON q1.a = q2.a

Now as we know how to rewrite queries with distinct aggregations on one and two attributes, we can use induction to rewrite queries with more distinct aggregations on more attributes. For example, a query with distinct aggregations on three attributes would require two joins, etc.

This clever transformation allows us not to implement `DISTINCT` handling at the engine level. But is it efficient enough?

The main disadvantage is that we must re-execute the aggregate's input subquery multiple times. In our example, this is just the `Scan` operator. In complex queries (e.g., TPC-DS), theaggregate's input could contain many heavy operators, and re-executing them from scratch might be prohibitively expensive. This problem might be alleviated if the query engine implements the common subplan deduplication feature. In this case, the engine will execute the input subquery once and distribute the result to multiple downstream consumers. However, many query engines (e.g., Trino) do not have this feature.

The good thing about this transformation is that we apply the aggregation to the input data early, which minimizes the amount of data being transferred to the downstream operators. This is especially important in the distributed setting.

In Apache Calcite, the distinct aggregation rewrite is implemented in the AggregateExpandDistinctAggregatesRule rule.

Now let's move on to Trino, which processes distinct aggregations very differently.

Trino

What if you desperately want to support distinct aggregations on several attributes but do not want to re-execute the inner subquery multiple times, as Apache Calcite suggests? Indeed, use window functions!

Recall that in our sample query, we want to find the unique pairs of `[a, b]` and `[a, c]`, and then group them by `[a]`.


SELECT a, COUNT(DISTINCT b) agg1, COUNT(DISTINCT c) agg2, COUNT(d) agg3
FROM t 
GROUP BY a

We can add a marker attribute that will have the value `true` if the given pair `[a, b]` is observed for the first time and `false` otherwise. This can be modeled with the `row_number()` window function partitioned by `[a, b]` as follows:


SELECT 
  a, 
  b,
  row_number() OVER (PARTITION BY a, b) = 1 b_unique
FROM t 

Now that we marked the unique `[a, b]` tuples, we can remove the `DISTINCT` keyword from the aggregation function but add the `FILTER` keyword to filter out the non-unique `[a, b]` pairs when calculating the aggregate.


SELECT 
  a, 
  COUNT(b) FILTER (b_unique)
FROM (  
  SELECT 
    a, 
    b,
    row_number() OVER (PARTITION BY a, b) = 1 b_unique
  FROM t
)
GROUP BY a

Now we add a dedicated marker for each "distinct" attribute, which gives us the final query:


SELECT 
  a, 
  COUNT(b) FILTER (b_unique)
  COUNT(c) FILTER (c_unique)
FROM (  
  SELECT 
    a, 
    b,
    c,
    row_number() OVER (PARTITION BY a, b) = 1 b_unique
    row_number() OVER (PARTITION BY a, c) = 1 c_unique
  FROM t
)
GROUP BY a

This is a pseudo code to demonstrate the idea. In reality, Trino uses a dedicated `MarkDistinct` operator that calculates the uniqueness flags more optimally than a general-purpose `Window` operator with `row_number()` function. You may think of the `MarkDistinct` operator as a highly-specialized version of a `Window` operator.

This rewrite strategy avoids multiple re-executions of the same input, which is good. However, the `MarkDistinct` operator partitions data without doing grouping (similarly to the `Window` operator). This means the `MarkDistinct` operator produces as many rows as it consumes. Since Trino is a distributed engine, `MarkDistinct` also requires a re-shuffle of the input rows according to the partitioning scheme. For `N` distinct attributes, up to `N` sequential re-shuffles of the input data set would be required. To contrast, with the Apache Calcite approach, we scan the table `N` times (provided that we do not have a common subplan deduplication), but the downstream aggregates usually significantly reduce the number of rows being processed. Therefore, both approaches may outperform one another under different circumstances.

In Trino, the distinct aggregation rewrite is implemented in the MultipleDistinctAggregationToMarkDistinct rule.

Recap

Rewrite a query with distinct aggregations on a single attribute (pseudo code):


# Before
SELECT a, COUNT(DISTINCT b) agg1, COUNT(c) agg2
FROM t
GROUP BY a

# After (Apache Calcite, Trino)
SELECT a, COUNT(b) agg1, SUM(tmp) agg2
FROM (
  SELECT a, b, COUNT(c) tmp1
  FROM t
  GROUP BY a, b
)
GROUP BY a

Rewrite a query with distinct aggregations on several attributes (pseudo code):


# Before
SELECT a, COUNT(DISTINCT b) agg1, COUNT(DISTINCT c) agg2, COUNT(d) agg3
FROM t
GROUP BY a

# After (Apache Calcite)
WITH q1 AS (
  SELECT a, COUNT(b) agg1, COUNT(tmp) agg3
  FROM (
    SELECT a, b, SUM(d) tmp
    FROM t
    GROUP BY a, b
  ) 
  GROUP BY a
),
q2 AS (
  SELECT a, COUNT(c) agg2
  FROM (
    SELECT a, c
    FROM t
    GROUP BY a, c
  ) 
  GROUP BY a
)
SELECT q1.a, q1.agg1, q2.agg2, q1.agg3
FROM q1 INNER JOIN q2 ON q1.a = q2.a

# After (Trino)
SELECT 
  a, 
  COUNT(b) FILTER (b_unique) agg1,
  COUNT(c) FILTER (c_unique) agg2,
  COUNT(d) agg3
FROM (  
  SELECT 
    a, 
    b,
    c,
    row_number() OVER (PARTITION BY a, b) = 1 b_unique
    row_number() OVER (PARTITION BY a, c) = 1 c_unique
  FROM t
)
GROUP BY a

Summary

Aggregation is a common operation in analytical workloads. Aggregation functions may have the `DISTINCT` qualifier that further deduplicates input tuples within a group. The implementation of distinct aggregations is not trivial and requires additional re-partitioning of the input data.

When a query contains distinct aggregations on a single input attribute, the `DISTINCT` qualifier could be removed if we add an additional preliminary grouping that eliminates duplicates.

When a query contains distinct aggregations on several input attributes, several execution strategies exist. In Apache Calcite, queries are rewritten to one or more joins that combine the results of individual single-attribute distinct aggregations. In Trino, queries are rewritten to use the `MarkDistinct` window function that detects rows with unique attribute values before doing the aggregation.

The Apache Calcite approach requires multiple executions of the input subquery, but the downstream aggregates usually reduce the result set quickly. The Trino approach does not need to execute the input subquery several times, but the `MarkDistinct` operator requires multiple re-shuffles of the possibly large input result set. Both approaches work well in some cases, and demonstrate bad performance in others.

In future posts, we will discuss how state-of-the-art engines implement common subplan deduplication optimization.

We are always ready to help you with your query engine design. Just let us know.