Speed Up Apache Spark* Analytics with Intel® Data Analytics Acceleration Library

By Gennady Fedorov, Published: 07/14/2017, Last Updated: 09/12/2017

    Intel® DAAL is a performance library that provides algorithms focused on all stages of data analytics including pre-processing, transformation, analysis, modeling, validation, and decision making.

    In the era when even smallest datasets are calculated in gigabytes, datasets of terabyte size are met in common practice, big companies work on datasets of petabyte size and overall humanity is finding its path to Exabyte, one can’t rely on the assumption that one cpu will be enough. The whole set of data may not fit in memory of one node or the compute power of one cpu may not be sufficient to process all available data in reasonable time. Distributed computation come to an aid for solving such tasks.

    There are several frameworks for distributed computation with focus on data analytics but one, which is most widely used for this purpose, is Apache Spark. Apache Spark is shipped with MLlib, library of data processing and machine learning algorithms. MLlib has simple yet powerful interfaces for processing large sets of data.

    Here is an example of using MLlib for Principal component analysis (PCA):

import org.apache.spark.mllib.feature.{PCA, PCAModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data    = sc.textFile("/Spark/PCA/data/PCA.txt")
val dataRDD = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
val model = new PCA(10).fit(dataRDD)
println("Principal components:" + model.pc.toString())

    This example initiates several steps for distributed calculation:

  • textFile() will read text from file into Resilient Distributed Dataset (RDD) of Strings, representing each line in file. RDD is a distributed collection of elements, key Spark abstraction for data distributed across nodes. All further operations with RDD will be performed in distributed manner.
  • Higher level map() will transform each String into Vector of values in Strings format. Nested map() will convert each String value into double precision floating point value.
  • fit() will initiate PCA algorithm over RDD of Vectors of double precision values.

textFile() and map() have linear execution complexity of O(n*p), where n is the number of rows and p is the number of values in each row. Both operations are not required to have data communication between nodes. At the same time fit() function has nonlinear complexity for both execution and data transfer parts of the operation, both depend on implementation. fit() is the most performance critical operation in this example.

If you already use MLlib library and have heavy PCA calculation, Intel DAAL can help you get your results faster. Starting from Intel DAAL v.2018 you can use Intel DAAL within your MLlib enabled application for critical performance paths.

    This is your new code, which is using Intel DAAL

// import org.apache.spark.mllib.feature.{PCA, PCAModel}
import daal_for_mllib.{PCA, PCAModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data    = sc.textFile("/Spark/PCA/data/PCA.txt")
val dataRDD = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
val model = new PCA(10).fit(dataRDD)
println("Principal components:" + model.pc.toString())

You can notice that only one import line is changed, while the rest code is the same with all the performance benefits of highly optimized performance library.

These are PCA performance measurements for several datasets:

Measured on 7-nodes cluster with Intel(R) Xeon(R) CPU E5-2680 v2. Apache Spark 2.0, Scala 2.11 

    Intel DAAL 2018 contains MLlib compatible interfaces for PCA and K-means, more to come in further releases.

Several additional aspects should be taken into account, when targeting to show maximum performance. There are two levels of parallelization which dealt with Apache Spark*:

  • Multimode work distribution, across several cluster servers
  • Internode work parallelization, across logical processors

Main MLlib strategy is to address both levels with the similar computational blocks. Intel DAAL is capable of fine grain resource utilization for internode parallelization, doing specific optimizations for using shared memory and shared CPU caches.

  • Main strategy while using MLlib is to set --num-executors and --executor-cores in such a way, that there is around 1 executor-core per each logical processor in the cluster. The scheme will work with Intel DAAL as well.
  • You can try set those variable to have 1-3 executor–cores per each cluster node, to look for even better performance from Intel DAAL, to get even better performance.

Optimal values for these configuration options highly depend on the whole application and vary from case to case.

Summary.

Intel DAAL can be used to optimize your Apache Spark application. With minor changes in your code, MLlib algorithm can optimized with Intel DAAL. Fine-tuning may require changes of the executor parameters, but even with default parameters you will see performance boost. First algorithms are part of Intel DAAL v.2018, more algorithms to follow.

Intel DAAL samples available under this link https://software.intel.com/en-us/product-code-samples.

Product and Performance Information

1

Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

Notice revision #20110804