Restudy SchemaRDD in SparkSQL

Author: Hao Cheng, Jie Huang

Abstract

At the very beginning, SchemaRDD was just designed as an attempt to make life easier for developers in their daily routines of code debugging and unit testing on SparkSQL core module. The idea can boil down to describing the data structures inside RDD using a formal description similar to the relational database schema. On top of all basic functions provided by common RDD APIs, SchemaRDD also provides some straightforward relational query interface functions that are realized through SparkSQL. After several releases and updates, SchemaRDD successfully drew attention among developers in Spark community. Now, it is officially renamed to “DataFrame” API on Spark’s latest trunk. This article starts with background of SchemaRDD, and then analyzes its design principles and application characteristics. Finally, it gives a brief review of SchemaRDD’s history, and makes a general discussion on its application prospects in Spark’s future development trends.

Background Knowledge

Before we further study SchemaRDD, let us review what relational database schema is, and how Spark handles SQL query.

  • Relational Database Schema

Schema is a formal language the relational database can understand. It describes logically how structured data are organized.

For example: Data table A contains two fields. The first field named “key” is of the integer type; the second field named “value” is a string type field. We can also define some constraints in these fields, such as: field "key" cannot be null. In this case, table A can be described as the following schema:

 A (key: Int (not null), value: String)

With the schema of table A, we are able to write SQL queries and build structured query semantics. Through the example, we see that schema can be regarded as the foundation of structured query, analysis and optimization.

  • SQL Execution Steps

From entering SparkSQL to finally getting query results, SQL text normally goes through four steps as the following figure shows (steps to the right of the red-dotted line are handled in SparkSQL core module and transparent to users):

A component responsible for lexical and syntactic parsing of SQL text is often called SQL parser. By SQL parser, SQL text is transformed into the Unresolved Logic Plan. Spark uses a "tree" structure to represent its logical plan, and each node on the tree represents a particular relational operation.

After receiving the Unresolved Logical Plan tree, the Semantic Analyzer will carry out a series of analytical reasoning for the logical plan tree according to metadata information obtained from tables in database. The Optimizer will optimize the logic plan and expressions computing based on basic theories of relational algebra or the algebra of logic. At last the execution Strategy Planner will transform the Optimized Logical Plan tree into Physical Plan tree, to fit Spark engine’s characteristics. The Physical Plan generates an RDD object instance that can be exported and submitted to Spark clusters for execution.

SELECT a.value, b.value FROM t1 a INNER JOIN t2 b on a.key = b.key AND a.key<10;

The following figure shows how its inner objects change when a SQL text is analyzed and transformed by SparkSQL:

2

The above example briefly describes the process of generating RDD from SQL text and submitting it to Spark clusters for execution. SQL queries are essentially the transformation process of internal objects which are generated during collaborative processes by a variety of independent functional modules at different stages. Spark SQL engine is a very open framework structure, within which each module is completely decoupled with the others. Collaboration between modules is entirely passing through the logical plan tree object. At the same time, each module is highly customizable; the only requirement is to keep completeness of operations and modifications on the logical plan tree.

Design Principle

Literally speaking, SchemaRDD is only a combination of RDD and schema information. In fact, it also offers many rich and easy-to-use APIs (i.e. the DataSet API). As mentioned above in the “Background Knowledge” section, schema is used to describe how structured data is logically organized. After obtaining schema information, SQL engine is able to provide the structured query capability for the corresponding data. The DataSet API is to some extent a replacement for SQL parser’s functions. It takes the form of API to achieve the goal of directly generating the original program logic tree. Its subsequent processing steps still reuse Spark SQL’s core logic. Therefore, we can consider DataSet API’s processing functions as completely equivalent to that of SQL queries.

SchemaRDD is essentially a RDD subclass. The difference is that it encapsulates the logic plan tree as its generic attribute. Whenever a program calls DataSet API, a new SchemaRDD object is created, and the logic plan attribute of the new object is created by overlaying a new logic operation node on the original logic plan tree. The same as RDD, operations of DataSet API are also divided into two types, i.e. Transformation and Action. All APIs related to the relational operations are attributed to the Transformation type, while all operations associated with data output sources are attributed to the Action type. Similar to RDD, only when an Action type operation is called, will Spark job submission be triggered and delivered for cluster execution. The figure below illustrates this transformation process:

3

SchemaRDD does transformation (Transform API) by directly overlaying the corresponding logic plan node and creating a new SchemaRDD instance. Only when an Action API is called, Spark SQL’s logic plan analysis, optimization and optimization of the physical plan will be triggered to generate and export RDD object instance. And all of these operations are completely transparent to the API users.

In other words, the above SchemaRDD operations are almost equivalent to the following SQL query statement:

SELECT * FROM a where key<3 SORT BY value;

Even better, when using the DataSet API, you can output the final query results to a specified external folder, which is more flexible compared with the normal SQL query.

Application Characteristics

  • Easy-to-Use APIs

Through RDD, Spark provides a large number of primitives similar to logic operations. Compared with Hadoop’s Map and Reduce primitives, the rich RDD API greatly reduces the difficulty in writing Spark applications. However, although RDD can handle the structured data, due to the limited means of optimization and execution that Spark provides, many obvious logical optimizations still need to be provided by application developers themselves; besides, because RDD does not provide the direct API for processing logic of some slightly complex relations, application developers have to do secondary development for that.

By offering the more flexible and easy-to-use DataSet API, SchemaRDD gives a good solution to the above mentioned limitations of RDD API. Below are the advantages of Data Set API:

  • It provides more APIs which cover the common functions to meet data analysis needs, and the name of the API implies its function, such as, to create the SchemaRDD (via HiveContext) include:

    def sql(sqlText: String) : SchemaRDD
    def table(tableName: String) : SchemaRDD
    def parquetFile(path: String) : SchemaRDD
    …
    
  • For relational query related APIs, the naming follows a rule to build a one-one correspondence between an API and its equivalent SQL statement’s keyword, to help veteran SQL programmers quickly get started. Common APIs include:

    def select(exprs: Expression*): SchemaRDD
    def where(condition: Expression): SchemaRDD
    def orderBy(sortExprs: SortOrder*): SchemaRDD
    def limit(limitNum: Int): SchemaRDD
    def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): SchemaRDD
    def where(dynamicUdf: (DynamicRow) => Boolean)
    …
    
  • Data output for query results is more flexible. The relevant APIs include:

    def saveAsParquetFile(path: String)
    def saveAsTable(tableName: String)
    def registerTempTable(tableName: String)
    def cache(): SchemaRDD
    def persist(newLevel: StorageLevel): SchemaRDD
    …
    
  • Compatible with Hive Data Source and Hive Syntax

In traditional ETL applications that are based on Hadoop MapReduce, Hive exhibits a very strong ability to execute relational queries. However it only allows executing queries submitted through SQL text, while in many cases,  simple preprocessing of data needs to be done by Shell scripts, MapReduce program, Pig scripts, and so on before data  are put into Hive database, and only after that, Hive SQL is used to perform some more complex relational queries. Therefore, cross-language, cross-process variable transmission and debugging have become a nightmare for the MapReduce-based ETL applications.

SparkSQL is almost completely compatible with Hive in the SQL syntax, database views and persistent underlying file format. It not only supports the creation of SchemaRDD direct from HiveQL queries, but also supports to register SchemaRDD as data sheets, to give HiveQL query an easy accessibility to SchemaRDD execution results. In addition, with the flexibility by DataSet API, Spark SQL is a simpler and more intuitive solution to traditional Hadoop ETL applications.

The following code demonstrates how to build a SchemaRDD from RDD data, register it as a temporary table, "INNER JOIN" it with the Hive data sheet "hivetable", and finally print the results to console.

import org.apache.spark.sql.hive.HiveContext

case class KV(key: Int, value: String)

// Create common RDD, ‘’sc” is an object instance of SparkContext
val kvRdd = sc.parallelize((1 to 100).map(i => KV(i, s"val_$i")))

// Create HiveContext
val hc = new HiveContext(sc)

// Import methods of HiveContext
import hc._

// Create SchemaRDD from RDD and register it as a temporary table “kv_rdd”
kvRdd.where('key >= 1).where('key <=5).registerTempTable("kv_rdd")

// Create SchemaRDD using HiveQL. The SchemaRDD temporary table “kv_rdd” can be directly referenced in HiveQL.

val result = sql("SELECT a.key, b.value FROM kv_rdd a INNER JOIN hivetable b ON a.key=b.key AND a.key<10 LIMIT 20")

// Call SchemaRDD’s collect function (Action method) to get query result and print to console.
 result.collect().foreach(row => {
  val f0 = if(row.isNullAt(0)) "null" else row.getInt(0)
  val f1 = if(row.isNullAt(1)) "null" else row.getString(1)
  println(s"result:$f0, $f1")
})
  • Simpler User-Defined Extension

In addition to the basic SQL operations, in real business implementation we often need to write user-defined functions (UDF). Although Spark SQL is well integrated with Hive whose support for UDF is very user-friendly, for most application developers it is still too complicated to write UDF using the Hive interface.

Spark SQL provides better user-defined function abstraction, so developers with an understanding of Scala or Java language can easily write a UDF, for example:

val makeConcat = (a: Int, b: String) => b + ":" + a  // Define the function
import org.apache.spark.sql.catalyst.dsl._  // Import the implicit transformation function
srdd.select(makeConcat.call('key, 'value))  // Use UDF in Data Set API

or

val makeConcat = (a: Int, b: String) => b + ":" + a
registerFunction("MyConcat", makeConcat) // Register UDF
sql("SELECT key, value, MyConcat(key, value) FROM src LIMIT 5") // Use UDF in SQL
  • Automatic Performance Optimization

DataSet API is used to describe the business needs logic. Before logic is finally submitted to clusters for execution, Spark SQL performs overall analysis and optimization on it. When we look at the example in the previous section, the order in which APIs are called is as follows:

table("a").where('key<3).sort('value).saveAsParquet(..)

Imagine by accident the programmer wrote it as:

table("a").sort('value).where('key<3).saveAsParquet(..)

And without optimization, it definitely leads to a sharp decline in execution performance, because we are always warned to filter invalid data before sort them in order to achieve optimal performance. RDD API won’t help with this, but if you are using DataSetAPI, SparkSQL’s optimization module automatically completes optimization of logic operations for you.

Of course, you may doubt that such obvious optimizations are too easy to detect for application developers, and in real situation challenges often come from business scenarios with more complexity or involving logic generation by cross-module invocation, which pose actual difficulties even for manual optimization efforts. However, we still believe that SparkSQL’s automatic optimization will do better and help us more with the continuous evolution of Spark ecosystem. We also believe in the future, with DataSet API, no logic optimizations need to be done manually and we can rely on SparkSQL to automatically adjust code and achieve the best execution performance.

  • Built-in Support for Parquet File

Apache Parquet, a data storage and compression format based on columnar storage and condition filtering, greatly improves data processing performance. Spark 1.2 supports Parquet file format by default. Of course, in order to take full advantage of the characteristics of the Parquet format, you still need to do some complex coding work based on Parquet API and let SparkSQL fully optimize the way Parquet API is called. The optimization is done inside SparkSQL and only the simple ParquetFile interface is exposed to users. Below is an example:

def parquetFile(filename: String): SchemaRDD

History of SchemaRDD

  • Early Stage

At the early stage of Spark SQL development, SQL parser was not perfect. Its underlying logic plan analysis, optimization, physical plan generation, etc. are completely decoupled with SQL parser itself. So in order to facilitate presentation and improve development efficiency, SchemaRDD was designed; and to simplify unit test code, some commonly used functions were added to it. Even now in Spark source code, we still can see a lot of unit test cases written based  on SchemaRDD.

  • Development Stage

As Spark SQL attracted more and more developers to get involved in its development, the simple-to-use features of DataSet API gradually gained widespread attention and many developers contribute to complementing and extending DataSet API. At one time, there were many discussions about what are the most appropriate APIs for SchemaRDD.

From Spark 1.2, MLlib started to build its data processing pipeline based on SchemaRDD, and brought a higher degree of concern to SchemaRDD. However, SchemaRDD’s ill-considered pre-design problems are gradually exposed, such as insufficient support for Java language, performance problems, APIs with duplicated functions and so on.

  • Latest Status

After Spark 1.2, the disadvantages of SchemaRDD have been discussed in Apache community. In order to solve the historical problems, a comprehensive reconstruction of SchemaRDD has been conducted in Spark’s code trunk and SchemaRDD was re-named to "DataFrame". This is a concept from the Python Pandas API, and is intended to emphasize its nature of relational operations. More changes also took place in performance optimization of cross-language invocation, usability improvement and clearance of redundant APIs, etc.. "DataFrame" API will be one of the most important changes in Spark 1.3.

  • Prospects

After the official release of DataFrame API, the concept of programming around RDD will be gradually weakened in Spark applications. DataFrame API will provide more complete functions, and at the same time supports Java, Scala, Python as well as other programming languages, so that developers are really free to choose their preferred languages for Spark applications development.

During the January’s Bay Area Meetup, Databricks announced that DataFrame API will become the fundamental API of Spark ecosystem in the future, and MLlib, GraphX, Spark SQL, even Streaming will build their operation pipelines based on it. DataFrame API will in the true sense realize data exchange between modules inside Spark ecosystem, to avoid duplicated coding works and upgrade Spark applications’ execution performance.

Summary

Although SchemaRDD is renamed as DataFrame on Spark’s latest trunk version, it has no major changes in the basic principles of its internal implementation and continues to provide a more flexible and easy-to-use API for Spark application developers.

While RDD API is an abstraction of distributed data processing system, DataSet/DataFrame API is an abstraction totally from the developers’ perspective. I believe in the near future, more and more Spark applications will be implemented based entirely on the DataFrame API and Spark application development will be much easier than ever.

About the author:

  • Hao Cheng, Software Engineer from SSG STO Big Data Technology, Intel Asia-Pacific Research & Development Ltd. , who is also an active Spark SQL contributor in the Apache community.
  • Jie Huang, Engineering Manager of SSG STO Big Data Technology, Intel Asia-Pacific Research & Development Ltd.
  • Thanks Ashley Xu for translating the blog from Chinese to English.
AttachmentSize
Image icon 1.png8.69 KB
Image icon 2.png27.34 KB
Image icon 3.png16.9 KB
For more complete information about compiler optimizations, see our Optimization Notice.