Assembling a query optimizer with Apache Calcite

Written by

Vladimir Ozerov

December 15, 2020

Assembling a query optimizer with Apache Calcite

Abstract

Apache Calcite is a dynamic data management framework with SQL parser, optimizer, executor, and JDBC driver.

Many examples of Apache Calcite usage demonstrate the end-to-end execution of queries using JDBC driver, some built-in optimization rules, and the `Enumerable` executor. Our customers often have their own execution engines and JDBC drivers. So how to use Apache Calcite for query optimization only, without its JDBC driver and `Enumerable` executor?

In this tutorial, we create a simple query optimizer using internal Apache Calcite classes.

Schema

First, we need to define the schema. We start with a custom table implementation. To create a table, you should extend Apache Calcite's `AbstractTable`. We pass two pieces of information to our table:

  1. Field names and types that we will use to construct the row type of the table (required for expression type derivation).
  2. An optional `Statistic` object that provides helpful information for query planner: row count, collations, unique table keys, etc.

Our statistic class exposes only row count information.


public class SimpleTableStatistic implements Statistic {

    private final long rowCount;
    
    public SimpleTableStatistic(long rowCount) {
        this.rowCount = rowCount;
    }
    
    @Override
    public Double getRowCount() {
        return (double) rowCount;
    }
    
    // Other methods no-op
}

We pass column names and types to our table class to construct the row type, which Apache Calcite uses to derive data types of expressions.


public class SimpleTable extends AbstractTable {

    private final String tableName;
    private final List<String> fieldNames;
    private final List<SqlTypeName> fieldTypes;
    private final SimpleTableStatistic statistic;

    private RelDataType rowType;

    private SimpleTable(
        String tableName, 
        List<String> fieldNames, 
        List<SqlTypeName> fieldTypes, 
        SimpleTableStatistic statistic
    ) {
        this.tableName = tableName;
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
        this.statistic = statistic;
    }
    
    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (rowType == null) {
            List<RelDataTypeField> fields = new ArrayList<>(fieldNames.size());

            for (int i = 0; i < fieldNames.size(); i++) {
                RelDataType fieldType = typeFactory.createSqlType(fieldTypes.get(i));
                RelDataTypeField field = new RelDataTypeFieldImpl(fieldNames.get(i), i, fieldType);
                fields.add(field);
            }

            rowType = new RelRecordType(StructKind.PEEK_FIELDS, fields, false);
        }

        return rowType;
    }

    @Override
    public Statistic getStatistic() {
        return statistic;
    }
}

Our table also implements Apache Calcite's `ScannableTable` interface. We do this only for demonstration purposes because we will use a certain `Enumerable` optimization rule in our example that will fail without this interface. You do not need to implement this interface if you are not going to use the Apache Calcite `Enumerable` execution backend.


public class SimpleTable extends AbstractTable implements ScannableTable {
    ...
    @Override
    public Enumerable<Object[]> scan(DataContext root) {
        throw new UnsupportedOperationException("Not implemented");
    }
    ...
}

Finally, we extend Apache Calcite's `AbstractSchema` class to define our own schema. We pass a map from a table name to a table. Apache Calcite uses this map to resolve tables during semantic validation.


public class SimpleSchema extends AbstractSchema {

    private final String schemaName;
    private final Map<String, Table> tableMap;

    private SimpleSchema(String schemaName, Map<String, Table> tableMap) {
        this.schemaName = schemaName;
        this.tableMap = tableMap;
    }

    @Override
    public Map<String, Table> getTableMap() {
        return tableMap;
    }
}

We are ready to start the optimization.

Optimizer

The optimization process consists of the following phases:

  1. Syntax analysis that produces an abstract syntax tree (AST) from a query string.
  2. Semantic analysis of an AST.
  3. Conversion of an AST to a relational tree.
  4. Optimization of a relational tree.

Configuration

Many Apache Calcite classes that we use for query optimization require configuration. However, there is no common configuration class in Apache Calcite that could be used by all objects. For this reason, we store the common configuration in a single object and then copy configuration values into other objects when needed.

In this specific example, we instruct Apache Calcite on how to process object identifiers: do not change identifier casing, use case-sensitive name resolution.


Properties configProperties = new Properties();

configProperties.put(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), Boolean.TRUE.toString());
configProperties.put(CalciteConnectionProperty.UNQUOTED_CASING.camelName(), Casing.UNCHANGED.toString());
configProperties.put(CalciteConnectionProperty.QUOTED_CASING.camelName(), Casing.UNCHANGED.toString());

CalciteConnectionConfig config = new CalciteConnectionConfigImpl(configProperties);

Syntax Analysis

First of all, we should parse the query string. The result of parsing is an abstract syntax tree, with every node being a subclass of `SqlNode`.

We pass a part of our common configuration to the parser configuration, then instantiate `SqlParser`, and finally perform the parsing. If you have a custom SQL syntax, you may pass a custom parser factory class to the configuration.


public SqlNode parse(String sql) throws Exception {
    SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder();
    parserConfig.setCaseSensitive(config.caseSensitive());
    parserConfig.setUnquotedCasing(config.unquotedCasing());
    parserConfig.setQuotedCasing(config.quotedCasing());
    parserConfig.setConformance(config.conformance());

    SqlParser parser = SqlParser.create(sql, parserConfig.build());

    return parser.parseStmt();
}

Semantic Analysis

The goal of semantic analysis is to ensure that the produced abstract syntax tree is valid. Semantic analysis includes the resolution of object and function identifiers, data types inference, checking the correctness of certain SQL constructs (e.g., a group key in the `GROUP BY` statement).

The validation is performed by the `SqlValidatorImpl` class, one of the most complex classes in Apache Calcite. This class requires several supporting objects. First, we create an instance of `RelDataTypeFactory`, which provides SQL type definitions. We use the built-in type factory, but you may also provide your custom implementation if need.


RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();

Then, we create a `Prepare.CatalogReader` object that provides access to database objects. This is where our previously defined schema comes into play. Catalog reader consumes our common configuration object to have an object name resolution mechanics consistent with the one we used during parsing.


SimpleSchema schema = ... // Create our custom schema

CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(schema.getSchemaName(), schema);

Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
    rootSchema,
    Collections.singletonList(schema.getSchemaName()),
    typeFactory,
    config
);

Then, we define a `SqlOperatorTable`, the library of SQL functions and operators. We use the built-in library. You may also provide your implementation with custom functions.


SqlOperatorTable operatorTable = ChainedSqlOperatorTable.of(
    SqlStdOperatorTable.instance()
);

We created all the required supporting objects. Now we instantiate the built-in `SqlValidatorImpl`. As usual, you may extend it if you need a custom validation behavior (such as custom error messages).


SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
    .withLenientOperatorLookup(config.lenientOperatorLookup())
    .withSqlConformance(config.conformance())
    .withDefaultNullCollation(config.defaultNullCollation())
    .withIdentifierExpansion(true);

SqlValidator validator = SqlValidatorUtil.newValidator(
    operatorTable, 
    catalogReader, 
    typeFactory,
    validatorConfig
);

Finally, we perform validation. Keep the validator instance because we will need it for AST conversion to a relational tree.


SqlNode sqlNode = parse(sqlString);
SqlNode validatedSqlNode = validator.validate(node);

Conversion to a Relational Tree

AST is not convenient for query optimization because the semantics of its nodes is too complicated. It is much more convenient to perform query optimization on a tree of relational operators, defined by the `RelNode` subclasses, such as `Scan`, `Project`, `Filter`, `Join`, etc. We use `SqlToRelConverter`, another monstrous class of Apache Calcite, to convert the original AST into a relational tree.

Interestingly, to create a converter, we must create an instance of a cost-based planner `VolcanoPlanner` first. This is one of Apache Calcite's abstraction leaks.

To create the `VolcanoPlanner`, we again pass the common configuration and the `RelOptCostFactory` that the planner will use to calculate costs. In a production-grade optimizer, you are likely to define a custom cost factory, because the built-in factories take in count only cardinality of relations, which is often insufficient for proper cost estimation.

You should also specify which physical operator properties the `VolcanoPlanner` should track. Every property has a descriptor that extends Apache Calcite's `RelTraitDef` class. In our example, we register only the `ConventionTraitDef`, which defines the execution backend for a relational node.


VolcanoPlanner planner = new VolcanoPlanner(
    RelOptCostImpl.FACTORY, 
    Contexts.of(config)
);

planner.addRelTraitDef(ConventionTraitDef.INSTANCE);

We then create a `RelOptCluster`, a common context object used during conversion and optimization.


RelOptCluster cluster = RelOptCluster.create(
    planner, 
    new RexBuilder(typeFactory)
);

We can create the converter now. Here we set a couple of configuration properties for a subquery unnesting, which is out of this post's scope.


SqlToRelConverter.Config converterConfig = SqlToRelConverter.configBuilder()
    .withTrimUnusedFields(true)
    .withExpand(false) 
    .build();

SqlToRelConverter converter = new SqlToRelConverter(
    null,
    validator,
    catalogReader,
    cluster,
    StandardConvertletTable.INSTANCE,
    converterConfig
);

Once we have the converter, we can create the relational tree.


public RelNode convert(SqlNode validatedSqlNode) {
    RelRoot root = converter.convertQuery(validatedSqlNode, false, true);

    return root.rel;
}

During the conversion, Apache Calcite produces a tree of logical relational operators, are abstract and do not target any specific execution backend. For this reason, logical operators always have the convention trait set to `Convention.NONE`. It is expected that you will convert them into physical operators during the optimization. Physical operators have a specific convention different from `Convention.NONE`.

Optimization

Optimization is a process of conversion of a relation tree to another relational tree. You may do rule-based optimization with heuristic or cost-based planners, `HepPlanner` and `VolcanoPlanner` respectively. You may also do any manual rewrite of the tree without rule. Apache Calcite comes with several powerful rewriting tools, such as `RelDecorrelator` and `RelFieldTrimmer`.

Typically, to optimize a relational tree, you will perform multiple optimization passes using rule-based optimizers and manual rewrites. Take a look at the default optimization program used by Apache Calcite JDBC driver or multi-phase query optimization in Apache Flink.

In our example, we will use `VolcanoPlanner` to perform cost-based optimization. We already instantiated the `VolcanoPlanner` before. Our inputs are a relational tree to optimize, a set of optimization rules, and traits that the optimized tree's parent node must satisfy.


public RelNode optimize(
    RelOptPlanner planner,
    RelNode node, 
    RelTraitSet requiredTraitSet, 
    RuleSet rules
) {
    Program program = Programs.of(RuleSets.ofList(rules));

    return program.run(
        planner,
        node,
        requiredTraitSet,
        Collections.emptyList(),
        Collections.emptyList()
    );
}

Example

In this example, we will optimize the TPC-H query №6. The full source code is available here. Run the `OptimizerTest` to see it in action.


SELECT
    SUM(l.l_extendedprice * l.l_discount) AS revenue
FROM
    lineitem
WHERE
    l.l_shipdate >= ?
    AND l.l_shipdate < ?
    AND l.l_discount between (? - 0.01) AND (? + 0.01)
    AND l.l_quantity < ?

We define the `Optimizer` class that encapsulates the created configuration, `SqlValidator`, `SqlToRelConverter` and `VolcanoPlanner`.


public class Optimizer {
    private final CalciteConnectionConfig config;
    private final SqlValidator validator;
    private final SqlToRelConverter converter;
    private final VolcanoPlanner planner;
    
    public Optimizer(SimpleSchema schema) {
        // Create supporting objects as explained above
        ... 
    }
}

Next, we create the schema with the `lineitem` table.


SimpleTable lineitem = SimpleTable.newBuilder("lineitem")
    .addField("l_quantity", SqlTypeName.DECIMAL)
    .addField("l_extendedprice", SqlTypeName.DECIMAL)
    .addField("l_discount", SqlTypeName.DECIMAL)
    .addField("l_shipdate", SqlTypeName.DATE)
    .withRowCount(60_000L)
    .build();

SimpleSchema schema = SimpleSchema.newBuilder("tpch").addTable(lineitem).build();

Optimizer optimizer = Optimizer.create(schema);

Now we use our optimizer to parse, validate, and convert the query.


SqlNode sqlTree = optimizer.parse(sql);
SqlNode validatedSqlTree = optimizer.validate(sqlTree);
RelNode relTree = optimizer.convert(validatedSqlTree);

The produced logical tree looks like this.


LogicalAggregate(group=[{}], revenue=[SUM($0)]): rowcount = 1.0, cumulative cost = 63751.137500047684
  LogicalProject($f0=[*($1, $2)]): rowcount = 1875.0, cumulative cost = 63750.0
    LogicalFilter(condition=[AND(>=($3, ?0), <($3, ?1), >=($2, -(?2, 0.01)), <=($2, +(?3, 0.01)), <($0, ?4))]): rowcount = 1875.0, cumulative cost = 61875.0
      LogicalTableScan(table=[[tpch, lineitem]]): rowcount = 60000.0, cumulative cost = 60000.0

Finally, we optimize the relational tree and convert it into the `Enumerable` convention. We use logical rules that convert and merge `LogicalProject` and `LogicalFilter` into compound `LogicalCalc`, and physical rules that convert logical nodes into `Enumerable` nodes.


RuleSet rules = RuleSets.ofList(
    CoreRules.FILTER_TO_CALC,
    CoreRules.PROJECT_TO_CALC,
    CoreRules.FILTER_CALC_MERGE,
    CoreRules.PROJECT_CALC_MERGE,
    EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
    EnumerableRules.ENUMERABLE_PROJECT_RULE,
    EnumerableRules.ENUMERABLE_FILTER_RULE,
    EnumerableRules.ENUMERABLE_CALC_RULE,
    EnumerableRules.ENUMERABLE_AGGREGATE_RULE
);

RelNode optimizerRelTree = optimizer.optimize(
    relTree,
    relTree.getTraitSet().plus(EnumerableConvention.INSTANCE),
    rules
);

The produced physical tree looks like this. Notice that all nodes are `Enumerable`, and that `Project` and `Filter` nodes have been replaced with `Calc`.


EnumerableAggregate(group=[{}], revenue=[SUM($0)]): rowcount = 187.5, cumulative cost = 62088.2812589407
  EnumerableCalc(expr#0..3=[{inputs}], expr#4=[*($t1, $t2)], expr#5=[?0], expr#6=[>=($t3, $t5)], expr#7=[?1], expr#8=[<($t3, $t7)], expr#9=[?2], expr#10=[0.01:DECIMAL(3, 2)], expr#11=[-($t9, $t10)], expr#12=[>=($t2, $t11)], expr#13=[?3], expr#14=[+($t13, $t10)], expr#15=[<=($t2, $t14)], expr#16=[?4], expr#17=[<($t0, $t16)], expr#18=[AND($t6, $t8, $t12, $t15, $t17)], $f0=[$t4], $condition=[$t18]): rowcount = 1875.0, cumulative cost = 61875.0
    EnumerableTableScan(table=[[tpch, lineitem]]): rowcount = 60000.0, cumulative cost = 60000.0

Summary

Apache Calcite is a flexible framework for query optimization. In this blog post, we demonstrated how to optimize SQL queries with Apache Calcite parser, validator, converter, and rule-based optimizer. In future posts, we will dig into individual components of Apache Calcite. Stay tuned!

We are always ready to help you with your SQL query optimizer design. Just let us know.