Lifetime of a Query in Drill Alpha Release

Ellen invited me to give a talk at the Bay Area Apache Drill group, and after working on Limit operator end to end, it gave me the idea I can illustrate how Drill handles a query and what happens in each stage from the time it receives the query, to the point when the result set is returned to the user.


This post I’ll walk through the steps in my slide and explain how Drill processes a SQL query from end to end.

In this slide it has a 3 DrillBit setup with Zookeeper and Hazelcast configured, and querying a SQL query using the sqlline command line. A DrillBit is simply a Drill process running in any particular node in the cluster.


The SELECT statement in particular is querying a HDFS data file, with a specific WHERE clause filtering based on the expression clause.

From the client, it submits this statement into Sqlline, which is a simple Java-based console that is able to talk to a JDBC driver, passes the SELECT statement into Optiq.

Optiq ( is a library that Drill utilizes for query parsing and planning, which it provides pluggable transformation rules that can map a SQL token into any specific object you want. Optiq also embeds a cost-based query optimizer, which we utilize for it for choosing the best order of SQL operators in the statement without any other statistics. We implemented custom Optiq rules that maps specific SQL operators (WHERE, LIMIT, etc) and each rule converts its operator into our specific Logical Operator syntax that Drill understands.

This collection of Logical operators with its own configurations forms a Drill Logical plan ( Drill’s logical plan sole purpose is to describe logically what work Drill needs to perform to produce the Query results, without any optimizations or distribution.

Yash has a great post covering what a Logical plan in Drill looks like here:

Once the client generates this logical plan, it looks up one of the DrillBit host/port information and passes the logical plan to that DrillBit.

Running Logical Plan

Any DrillBit in the cluster can initiate a query, and that DrillBit becomes the Drill process that is responsible for sending back the results back to the client.

Once the UserServer that is listening to the User submission gets the logical plan, it passes it through Foreman that is responsible for turning this plan into an actual execution plan and submits the plan information for execution.

Inside of Foreman, the first operation it does is to transform the logical plan into a physical plan via Drill’s Optimizer. Drill’s current version of Optimizer is very basic, which simply transforms each logical operator directly into one or more phyiscal operators without much optimization rules looking into the association of other operators.

The physical plan is simple a DAG (directed acyclic graph) of physical operators, and each child/parent relationship implies how data flows through the graph. As we are inspired by Google’s Dremel paper, the take away we saw that implemented which is a MPP style multi-level execution tree, where in this execution tree each node represents a different DrillBit process and they each depend on each other results for computation. There is an excellent Quora discussion about this here:

As we want to break this physical plan into a multi-level execution tree that involves multiple DrillBits, we first need to collect information about each physical operator. Each physical operator with the given operator configuration can return estimated stats about Network/Cpu/Memory/Disk and Row size and count. It also can return a list of preferred DrillBits that it wants to get executed on, which we call endpoint affinities.

One example Endpoint affinity is where a Parquet Scan opreator will want to initiate this query closet to where the Parquet file is stored, and it can look up the Parquet file’s Metadata information that stores the HDFS data node host and return that as a preferred endpoint if we have a DrillBit running there.

With the physical plan, all the stats and endpoint affinities, the Parallellizer in Foreman transforms this plan into a number of fragments. Each fragment is simply a self contained Physical plan that is designed to run on each DrillBit node. In any given Physical plan, there will be only one Root Fragment that is going to run in the initiating DrillBit, possibly one or more Leaf fragments and possibly one or more intermediate fragments.

Running Fragments

The Root fragment will be submitted to the Worker manager in its current DrillBit, the intermediate fragments will be stored in the HazelCast distributed cache, and all the leaf fragments will be sent directly to the DrillBits assigned via our BitCom through our RPC layer with Protobuf messages.

The Worker Manager once receives this Root Fragment starts running this plan, which always contains a Screen Operator that blocks and wait for Records to be returned. If a plan also has multiple Drillbits involved, then it will also contain a exchange operator that sets up a Receiver that waits for incoming Records from the wire.

The Leaf fragments that are sent via the wire will be executed as soon as they arrive. The fragment will be parsed into a DAG with Physical operators, and setups the execution/data flow. Each Physical operator imposes a Pull style messaging, where starting from the bottom of the tree it pulls it’s parent for Records and the parent will return an Outcome status. The operators is designed to handle each possible outcome status, which can be STOP, OK, OK_WITH_NEW_SCHEMA, NONE. Since Drill supports flexible schema, which in other words can tolerate schema changes in the same dataset, the operators needs to handle what happens when there is a new schema for this set of records. Seeing the benefits of columnar storage (Great post here: Drill also implemented its own in memory columnar format which we called ValueVectors. A ValueVector is simply a collection of bytes that represent a run of values within the same column. Each pull of messages in each Physical operator returns a RecordBatch that can contain one or more ValueVectors per column with schema.

At the very tip of the Leaf fragment in my slideshow example is the Scan operator, which is configured to look up a Parquet file and run it through the Parquet Storage Engine. The Storage engine is simply responsible for pulling in data from the data source, and converting them into ValueVectors and passes that back to its child as a RecordBatch.

Eventually the Leaf fragment will take this batch, and through the Sender operator send it to the Intermediate DrillBit.

The Intermediate DrillBit once receives a RecordBatch for the first time, will lookup the fragment from HazelCast by its fragment id from RecordBatch, and setup the Receiver and Physical opreators necessary for processing in this DrillBit.

The intermediate fragment contains a Filtering operator, and inside this operator once it receives a RecordBatch it looks up the new schema and passes that to our CodeGeneration with the specified filter expression and type information, and generate a specific code that does the filtering. This is designed to avoid casting, run tight loops and leverage prefetching as we can eliminate function calls. This is a very common approach looking at the Hive’s new vectorized query engine via Stinger initiative or in Impala.

The intermediate fragment eventually streams a batch at a time as soon as it is available to the Root DrillBit, which the Screen operator receives and returns it to the Client.

DrillClient that receives the RecordBatch, simply transforms our own columnar ValueVectors into Rows and shows the result to the client.

This is overall what the flow looks like in our Drill alpha release, and we will be continuing to validate this architecture through diagnostic and benchmarking, and also provide more operators and storage engine so we can do much more interesting data analysis.

About these ads

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s