Introduction to Data Shuffling in Distributed SQL Engines

Written by

Vladimir Ozerov

January 31, 2022

Introduction to Data Shuffling in Distributed SQL Engines


Distributed SQL engines process queries on several nodes. Nodes may need to exchange tuples during query execution to ensure correctness and maintain a high degree of parallelism. This blog post discusses the concept of data shuffling in distributed query engines.


SQL engines convert a query string to a sequence of operators, which will call an execution plan. We assume that operators in a plan are organized in a tree. Every operator consumes data from zero, one or more child operators, and produces an output that a single parent operator consumes. Practical engines may use DAGs, where several parent operators consume the operator's output, but we ignore such cases for simplicity.

In distributed engines, we may want to create several instances of the same plan's operator on different nodes. For example, a table might be partitioned into several segments that different workers read in parallel. Likewise, several nodes might execute a heavy `Join` operator concurrently, each instance producing only part of the output. In this case, we say that a single operator produces several physical data streams.

Operator Requirements

Operators must align their physical data streams carefully to ensure the correctness of results. Let us consider the behavior of a distributed `Join` operator.

The `Join` operator evaluates every pair of tuples from left and right inputs against a join condition. If we create several `Join` instances, we must ensure that the matching tuples always arrive at the same instance. How we do this depends on the join type and join condition. For equi join, we may partition inputs by join attributes, such that every tuple with the same value of the join key arrives at the same stream. Hashing is usually used, although any partitioning scheme will work, for so long the matching tuples are routed to the same stream.

Note that there might be multiple viable partitioning schemes. For example, for the join condition `a1=b1 AND a2=b2`, the input might be redistributed by `[a1, a2]`, `[a2, a1]`, `[a1]`, or `[a2]`. This adds considerable complexity to the query planning because different operator combinations might benefit from different partitioning schemes. We will discuss distributed planning in detail in the next blog post.

Alternatively, we may broadcast one of the inputs. If there are `N` instances of the `Join` operator, we create `N` full copies of one of the inputs. This might be beneficial if one of the inputs is much smaller than the other, such that broadcasting of the smaller input is cheaper than re-distribution of both inputs. Also, the broadcast scheme is mandatory for non-equi joins and some outer joins.

The academia proposed more distribution strategies. For example, Track Join tries to minimize the network traffic by creating an individual transfer schedule for each tuple. However, partitioned and broadcast shuffles are the most commonly used strategies in practical systems.

Similar to the distributed `Join`, the distributed `Aggregate` must ensure that all input tuples with the same aggregate key are routed to the same stream. The distributed `Union` operator must route similar tuples from all inputs to the same stream for proper deduplication. In contrast, the pipelined operators, such as `Project` and `Filter`, can be safely placed on top of any physical stream.


During query planning, optimizers usually maintain distribution metadata, such as distribution type, distribution function, and the number of shards. The common distribution types are:

  • `PARTITIONED` (or `SHARDED`) - operator's output is split into several disjoint streams. This is a common distribution type for intermediate operators.
  • `REPLICATED` - operator produces several data streams, all with the same complete set of tuples. This distribution often appears after the broadcast shuffle. Also, such distribution is common for small fact tables that are copied across all execution nodes.

The distribution function and the number of shards make sense only for the `PARTITIONED` output and describe how data is split between physical streams and how many such streams are. Common distribution function examples are hash, range, and random distribution.

The convenient way to express the data shuffling in the optimizer is to use a dedicated plan operator, usually called `Exchange` or `Shuffle`. The optimizer's goal is to find the optimal placement of `Exchange` operators in the query plan. A variety of algorithms might be used for this, from simple heuristic rewrites to fully-fledged cost-based optimization with the Cascades algorithm. We will discuss shuffle planning in detail in the next blog post.


The engine needs to figure out which nodes should execute which operations. Usually, engines cut the plan into parts calledfragmentsthat could be executed independently. The scheduler then assigns fragment instances to execution units based on resource utilization, data locality, and other factors.

`Exchange` operators are replaced with specialized implementations that transmit data between participants. OLTP engines may prefer to transfer data through network sockets to minimize latency. Big data engines may decide to exchange data through a persistent medium, such as distributed file system, to avoid loss of result in the case of participant crash.

Executors do not always strictly follow the original plan. Optimizers may produce not optimal plans due to imprecise statistics; system reconfiguration may happen during query execution, etc. Advanced executors may do runtime re-optimizations, overriding some planner decisions. For example, the executor may prefer one shuffle type over the other in the face of data skew or incorrect cardinality estimations or change the number of shuffle partitions in runtime. Please refer to the query robustness survey by Yin et al. for more ideas on possible runtime re-optimization strategies.


Distributed SQL engines execute queries on several nodes. To ensure the correctness of results, engines reshuffle operator outputs to meet the requirements of parent operators. Two common shuffling strategies are partitioned and broadcast shuffles.

Both query planner and executor use shuffles. Planner uses distribution metadata to find the optimal placement of shuffle operators. The executor tracks the state of data streams, routes tuples to the proper physical nodes, and may also override planner decisions in the case of data skew.

In future blog posts, we will discuss how query planners decide on the optimal placement of shuffle operators.

We are always ready to help you with your query engine design. Just let us know.