Custom traits in Apache Calcite

Written by

Vladimir Ozerov

December 29, 2020

Custom traits in Apache Calcite

Abstract

Physical properties are an essential part of the optimization process that allows you to explore more alternative plans.

Apache Calcite comes with convention and collation (sort order) properties. Many query engines require custom properties. For example, distributed and heterogeneous engines that we often see in our daily practice need to carefully plan the movement of data between machines and devices, which requires a custom property to describe data location.

In this blog post, we will explore how to define, register and enforce a custom property, also known as trait, with Apache Calcite cost-based optimizer.

Physical Properties

We start our journey by looking at the example of common physical property - sort order.

Query optimizers work with relational operators, such as `Scan`, `Project`, `Filter`, and `Join`. During the optimization, an operator may require it's input to satisfy a specific condition. To check whether the condition is satisfied, operators may expose physical properties - plain values associated with an operator. Operators may compare the desired and actual properties of their inputs and enforce the desired property by injecting a special enforcer operator on top of the input.

Consider the join operator `t1 JOIN t2 ON t1.a = t2.b`. We could use a merge join if both inputs are sorted on their join attributes, `t1.a` and `t2.b`, respectively. We may define the collation property for every operator, describing the sort order of produced rows:


Join[t1.a=t2.b]
  Input[t1]      [SORTED by a]
  Input[t2]      [NOT SORTED] 

The merge join operator may enforce the sorting on `t1.a` and `t2.b` on its inputs. Since the first input is already sorted on `t1.a`, it remains unchanged. The second input is not sorted, so the enforcer `Sort` operator is injected, making a merge join possible:


MergeJoin[t1.a=t2.b]  
  Input[t1]           [SORTED by t1.a]
  Sort[t2.b]          [SORTED by t2.b]
    Input[t2]         [NOT SORTED] 

Apache Calcite API

In Apache Calcite, properties are defined by the `RelTrait` and `RelTraitDef` classes. `RelTrait` is a concrete value of the property. `RelTraitDef` is a property definition, which describes the property name, expected Java class of the property, the default value of the property, and how to enforce the property. Property definitions are registered in the planner via the `RelOptPlanner.addRelTraitDef` method. The planner will ensure that every operator has a specific value for every registered property definition, whether the default or not.

All properties of a node are organized in an immutable data structure `RelTraitSet`. This class has convenient methods to add and update properties with copying semantics. You may access the properties of a concrete operator using the `RelOptNode.getTraitSet` method.

To enforce a specific property on the operator during planning, you should do the following from within the rule:

  1. Get the current properties of a node using `RelOptNode.getTraitSet` method.
  2. Create a new instance of `RelTraitSet` with updated properties.
  3. Enforce the properties by calling the `RelOptRule.convert` method.

Finally, before invoking the planner program, you may define the desired properties of the root operator of the optimized relational tree. After the optimization, the planner will either return the operator that satisfies these properties or throw an exception.

Internally, the Apache Calcite enforces properties by adding a special `AbstractConverter` operator with the desired traits on top of the target operator.


AbstractConverter [SORTED by a]
  Input[t2]       [NOT SORTED] 

To transform the `AbstractConverter` into a real enforcer node, such as `Sort`, you should add the built-in `ExpandConversionRule` rule to your optimization program. This rule will attempt to expand the `AbstractConverter` into a sequence of enforcers to satisfy the desired traits consulting to the trait definitions that we already discussed. We have only one unsatisfied property in our example, so the converter expands into a single `Sort` operator.


Sort[t2.a]        [SORTED by a]
  Input[t2]       [NOT SORTED] 

You may use your custom expansion rule if needed. See Apache Flink custom rule as an example.

Custom Property

As we understand the purpose of properties and which Apache Calcite API to use, we will define, register, and enforce our custom property.

Consider that we have a distributed database, where every relational operator might be distributed between nodes in one of two ways:

  1. `PARTITIONED` - relation is partitioned between nodes. Every tuple (row) resides on one of the nodes. An example is a typical distributed data structure.
  2. `SINGLETON` - relation is located on a single node. An example is a cursor that delivers the final result to the user application.

In our example, we would like to ensure that the top operator always has a `SINGLETON` distribution, simulating the results' delivery to a single node.

Enforcer

First, we define the enforcer operator. To ensure the `SINGLETON` distribution, we need to move from all nodes to a single node. In distributed databases, data movement operators are often called `Exchange`. The minimal requirement for a custom operator in Apache Calcite is to define the constructor and the `copy` method.


public class ExchangeRel extends SingleRel {
    public ExchangeRel(
        RelOptCluster cluster,
        RelTraitSet traits,
        RelNode input
    ) {
        super(cluster, traits, input);
    }

    @Override
    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
        return new ExchangeRel(getCluster(), traitSet, inputs.get(0));
    }
}

Trait

Next, we define our custom trait and trait definition. Our implementation must adhere to the following rules:

  1. The trait must refer to a common trait definition instance in the method `getTraitDef`.
  2. The trait must override the `satisfies` method to define whether the current trait satisfies the target trait. If not, the enforcer will be used.
  3. The trait definition must declare the expected Java class of the trait in the `getTraitClass` method.
  4. The trait definition must declare the default value of the trait in the `getDefault` method.
  5. The trait definition must implement the method `convert`, which Apache Calcite will invoke to create the enforcer if the current trait doesn't satisfy the desired trait. If there is no valid conversion between traits, `null` should be returned.

Below is the source code of our trait. We define two concrete values, `PARTITIONED` and `SINGLETON`. We also define the special value `ANY`, which we use as the default. We say that both `PARTITIONED` and `SINGLETON` satisfy `ANY` but `PARTITIONED` and `SINGLETON` do not satisfy each other.


public class Distribution implements RelTrait {

    public static final Distribution ANY = new Distribution(Type.ANY);
    public static final Distribution PARTITIONED = new Distribution(Type.PARTITIONED);
    public static final Distribution SINGLETON = new Distribution(Type.SINGLETON);

    private final Type type;

    private Distribution(Type type) {
        this.type = type;
    }

    @Override
    public RelTraitDef getTraitDef() {
        return DistributionTraitDef.INSTANCE;
    }

    @Override
    public boolean satisfies(RelTrait toTrait) {
        Distribution toTrait0 = (Distribution) toTrait;

        if (toTrait0.type == Type.ANY) {
            return true;
        }

        return this.type.equals(toTrait0.type);
    }

    enum Type {
        ANY,
        PARTITIONED,
        SINGLETON
    }
}

Our trait definition defines the `convert` function, which injects the `ExchangeRel` enforcer if the current property doesn't satisfy the target one.


public class DistributionTraitDef extends RelTraitDef<Distribution> {

    public static DistributionTraitDef INSTANCE = new DistributionTraitDef();

    private DistributionTraitDef() {
        // No-op.
    }

    @Override
    public Class<Distribution> getTraitClass() {
        return Distribution.class;
    }

    @Override
    public String getSimpleName() {
        return "DISTRIBUTION";
    }

    @Override
    public RelNode convert(
        RelOptPlanner planner,
        RelNode rel,
        Distribution toTrait,
        boolean allowInfiniteCostConverters
    ) {
        Distribution fromTrait = rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);

        if (fromTrait.satisfies(toTrait)) {
            return rel;
        }

        return new ExchangeRel(
            rel.getCluster(),
            rel.getTraitSet().plus(toTrait),
            rel
        );
    }

    @Override
    public boolean canConvert(
        RelOptPlanner planner,
        Distribution fromTrait,
        Distribution toTrait
    ) {
        return true;
    }

    @Override
    public Distribution getDefault() {
        return Distribution.ANY;
    }
}

You would likely have more distribution types, dedicated distribution columns, and different exchange types in production implementations. You may refer to Apache Flink as an example of a real distribution trait.

Putting It All Together

Let's see the new trait in action. The complete source code is available here.

First, we create a schema with a couple of tables - one with `PARTITIONED` distribution and another with `SINGLETON` distribution. We use custom table and schema implementation, similar to the ones we used in the previous blog post.


// Table with PARTITIONED distribution.
Table table1 = Table.newBuilder("table1", Distribution.PARTITIONED)
  .addField("field", SqlTypeName.DECIMAL).build();

// Table with SINGLETON distribution.
Table table2 = Table.newBuilder("table2", Distribution.SINGLETON)
  .addField("field", SqlTypeName.DECIMAL).build();

Schema schema = Schema.newBuilder("schema").addTable(table1).addTable(table2).build();

Then we create a planner instance and register our custom trait definition in it.


VolcanoPlanner planner = new VolcanoPlanner();

planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(DistributionTraitDef.INSTANCE);

Finally, we create a table scan operator for each of our tables and enforce the `SINGLETON` distribution. Notice that we use the aforementioned `ExpandConversionRule` in our optimization program. Otherwise, the enforcement will not work.


// Use the built-in rule that will expand abstract converters.
RuleSet rules = RuleSets.ofList(AbstractConverter.ExpandConversionRule.INSTANCE);

// Prepare the desired traits with the SINGLETON distribution.
RelTraitSet desiredTraits = node.getTraitSet().plus(Distribution.SINGLETON);
        
// Use the planner to enforce the desired traits
RelNode optimizedNode = Programs.of(rules).run(
    planner,
    node,
    desiredTraits,
    Collections.emptyList(),
    Collections.emptyList()
);

Now we run the TraitTest from the sample project to see this in action. For the `PARTITIONED` table, the planner has added the `ExchangeRel` to enforce the `SINGLETON` distribution.


BEFORE:
2:LogicalTableScan(table=[[schema, partitioned]])

AFTER:
7:ExchangeRel
  2:LogicalTableScan(table=[[schema, partitioned]])

But the table with the `SINGLETON` distribution remains unchanged because it already has the desired distribution.


BEFORE:
0:LogicalTableScan(table=[[schema, singleton]])

AFTER:
0:LogicalTableScan(table=[[schema, singleton]])

Congratulations! Our custom property is ready.

Summary

Physical properties are an important concept in query optimization that allows you to explore more alternative plans.

In this blog post, we demonstrated how to define the custom physical property in Apache Calcite. We created a custom `RelTraitDef` and `RelTrait` classes, registered them in the planner, and used the custom operator to enforce the desired value of the property.

However, we omitted one crucial question - how to propagate properties between operators? It turns out, Apache Calcite cannot do this well, and you will have to make a tough decision choosing between several non-ideal solutions. We will discuss property propagation in detail in future posts. Stay tuned!

We are always ready to help you with your SQL query optimizer design. Just let us know.