A Walk-Through of Distributed Processing Using Intel® DAAL

By Ying Hu, Published: 07/23/2015, Last Updated: 07/23/2015

Introduction

Intel® Data Analytics Acceleration Library (Intel® DAAL) is a new highly optimized library targeting data mining, statistical analysis, and machine learning applications. It provides advanced building blocks supporting all data analysis stages (preprocessing, transformation, analysis, modeling, decision making) for offline, streaming and distributed analytics usages. Intel DAAL support distributed data analytics based on variety of cluster platform including MPI* based cluster environments, Hadoop*/Spark* based cluster environments, low level data exchange protocols, etc.

Apache Spark* is a fast and popular big data platform for large-scale data processing. By allowing user programs to load data into a cluster's distributed memory and query it repeatedly, spark is well-suited to machine learning algorithms. This article does a work-through for the Principal Component Analysis (PCA) sample in samples/spark folder of Intel DAAL package to demonstrate how the distributed data analytics proceed on Spark* Cluster Using Intel DAAL.

Software: Intel® Data Analytics Acceleration Library 2016, Hadoop* v2.1.0, Apache Spark* v1.2.0

Hardware: 1 driver node + 4 slaver nodes,

Driver node: Intel(R) Xeon(R) CPU E5-2680 0 @ 2.70GHz, 2x8corex2 (HT Enabling), total 32 logical processors, 128GB of RAM

Slaver nodes: Intel(R) Xeon(R) CPU E5-2699 v3 @ 2.30GHz, 45MB LLC, 2x18x2(HT enabling), total 72 logical processors, 128GB of RAM per node

Operating System: Ubuntu 12.04.2 LTS x86_64

Principal Component Analysis (PCA) Sample

Intel DAAL solves data Analysis problem by providing pre-built data analytics algorithm that is highly optimized for a wide spectrum of Intel architecture (IA) -based systems. In addition to the traditional batch processing mode, Intel DAAL is more focused on supporting streaming processing and distributed data processing. The algorithms support streaming and distributed processing in the current release include Statistical moments, Variance-Covariance matrix, singular value decomposition (SVD), QR, Principal Component Analysis (PCA), KMeans, Linear Regression, Naïve Bayes etc. Here we will take PCA as sample.

Principal Component Analysis is a method for exploratory data analysis. PCA transforms a set of observations of possibly correlated variables to a new set of uncorrelated variables, called principal components. Principal components are the directions of the largest variance. PCA is one of powerful techniques for dimension reduction.

Given n feature vectors X1= (x11,…,x1p), ..., Xn= (xn1,…,xnp) of dimension p or a p x p correlation matrix Cor, to compute the principal components for the data set, the library returns the transformation matrix T, which contains eigenvectors in the row-major order and a vector of corresponding eigenvalues. You can use the results to choose the new dimension d < p and apply the transformation Td: Xi -> Yi to the original data set according to the rule Yi = TdXi^T, where the matrix Td is the submatrix of T that contains d eigenvectors corresponding to the d largest eigenvalues.

Intel DAAL supports two kinds of PCA algorithms: the correlation method (Cor) and the SVD method. Usually, we use PCA Cor more often as it is faster in performance.

The sample can be taken from your DAAL install directory,

~/intel/daal/samples/java/spark/sources/

SamplePcaCor.java

SparkPcaCor.java

DistributedHDFSDataSet.java

Distributed Processing - Input Data

Effective data management is among key constituents of the performance of a data analytics application. For Intel® Data Analytics Acceleration Library (Intel® DAAL), data management requires effectively performing the following operations:

  1. Raw data acquisition, filtering, and normalization with data source interfaces.
  2. Conversion of the data to a numeric representation for numeric tables.
  3. Data streaming from a numeric table to an algorithm

The below figure 1 shows the typical data flow using Intel DAAL.

Typical data flow within Intel DAAL

The original raw input data can be web data, Graphs, Stream data, Text data, and Image/Video/Voice data stored in memory, file or any data based. Intel DAAL support for local and distributed data sources as input including In-file and in-memory CSV, MySQL, HDFS, and Resilient Distributed Dataset (RDD) objects for Apache Spark* etc.

Intel DAAL contains several classes that implement the data source component responsible for representation of the data in a raw format, such as

DistributedDataSet  Abstract class that defines the interface for the data management component responsible for representation of the data in the distributed raw format.

StringDataSource Specifies the methods for accessing the data stored as a text in java.io.Strings format.

User can define their own data source components based on the class. These classes will responsible for converting raw format to the in-memory data in numeric tables

We assume that there was PCA input data set stored in Hadoop Distributed File System (HDFS) distributed memory. In sample, they were stored in csv file.

~/intel/daal/samples/java/spark/data

PcaCor_Normalized_1.csv

PcaCor_Normalized_2.csv

PcaCor_Normalized_3.csv

PcaCor_Normalized_4.csv

//# putting input data on HDFS

>hadoop fs -put data/${sample}*.csv /Spark/${sample}/data/

The sample DistributedHDFSDataSet.java and SamplePcaCor.java comes with pre-build connector and data parsers, like StringDataSource class, DistributedHDFSDataSet class and JavaPairRDD template. Through them and the code line 13-line 14 in SamplePcaCor.java, we read input data to numeric class HomogenNumericTable, which Intel DAAL uses for high efficiency computation. The member function getAsPairRDD of class DistributedHDFSDataSet was implemented in DistributedHDFSDataSet.java.  Thus all input data were took care of by RDD and Spark worker.

That data set was split in several blocks across computation nodes. Users do not need consider which data on which node and how to communication between them.

/* file: SamplePcaCor.java */

// Principal Component Analysis computation example program
public class SamplePcaCor {
    public static void main(String[] args) {
        DaalContext context = new DaalContext();

        /* Create a JavaSparkContext that loads defaults from system properties and the classpath and sets the name */
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark PCA(COR)"));

        /* Read from distributed HDFS data set from path */
        StringDataSource templateDataSource = new StringDataSource( context, "" );
        DistributedHDFSDataSet dd = new DistributedHDFSDataSet( "/Spark/PcaCor/data/", templateDataSource );
        JavaPairRDD<Integer, HomogenNumericTable> dataRDD = dd.getAsPairRDD(sc);

        /* Compute PCA for dataRDD using Correlation method */
        SparkPcaCor.PCAResult result = SparkPcaCor.runPCA(context, dataRDD);

        /* Print results */
        HomogenNumericTable EigenValues  = result.eigenValues;
        HomogenNumericTable EigenVectors = result.eigenVectors;
        printNumericTable("Eigen Values:",  EigenValues );
        printNumericTable("Eigen Vectors:", EigenVectors);
        context.dispose();
    }

Distributed Processing - Algorithms

In the distributed processing mode, Intel DAAL algorithm operates on a data set distributed across several devices (compute nodes), then produces partial results on each node, which are finally merged into the final result on the master node. PCA Correlation Method in the distributed processing mode follows the general schema described in Intel DAAL User and Reference Guides Algorithms.

In the code of SamplePcaCor.jav, the computation is done by function runPCA.

/* Compute PCA for dataRDD using Correlation method */
SparkPcaCor.PCAResult result = SparkPcaCor.runPCA(context, dataRDD);

The function was defined SparkPcaCor.java.

public static PCAResult runPCA(DaalContext context, JavaPairRDD<Integer, HomogenNumericTable> dataRDD) {
        computestep1Local(dataRDD);
        finalizeMergeOnMasterNode(context);
        return result;
}

It includes two steps:

Step 1 - on Local Nodes:Computestep1Local(dataRDD)

In this step, the PCA algorithm performs on local nodes. It accepts local input data block and calculates the local partial results on each nodes, then pass the Result ID as a parameter to next step. The code is in SparkPcaCor.java. For more details, see the figure 2 & Algorithms

private static void computestep1Local(JavaPairRDD<Integer, HomogenNumericTable> dataRDD) {
        partsRDD = dataRDD.mapToPair(
        new PairFunction<Tuple2<Integer, HomogenNumericTable>, Integer, PartialResult>() {
            public Tuple2<Integer, PartialResult> call(Tuple2<Integer, HomogenNumericTable> tup) {
                DaalContext context = new DaalContext();

                /* Create algorithm to calculate PCA decomposition using Correlation method on local nodes*/
                DistributedStep1Local pcaLocal = new DistributedStep1Local(context, Double.class, Method.correlationDense);

                /* Set input data on local node */
                tup._2.unpack(context);
                pcaLocal.input.set( InputId.data, tup._2 );

                /* Compute PCA on local node */
                PartialResult pres = pcaLocal.compute();
                pres.pack();
                context.dispose();
                return new Tuple2<Integer, PartialResult>(tup._1, pres);
            }
        }
    }

PCA algorithm performing on local nodes

Step 2 - on Master Node: finalizeMergeOnMasterNode

In this step, the PCA algorithm performs on Master nodes. It first collects the result from local nodes, and then adds them to MasterInputId. Alg.compute() executes the computation based on the partial result. Then finalize the computations and retrieve PCA results to numeric table ResultId.eigenVectors that contains eigenvalues and numeric table ResultId.eigenValues that contains eigenvectors. All Communications of local node and master node are taken care of by Hadoop or Spark. The algorithm diagram and the code (SparkPcaCor.java) is as below. For more details, see Algorithms

PCA algorithm performing on master node

private static void finalizeMergeOnMasterNode(DaalContext context) {

        /* Create algorithm to calculate PCA decomposition using Correlation method on master node */
        DistributedStep2Master pcaMaster = new DistributedStep2Master(context, Double.class, Method.correlationDense);
        List<Tuple2<Integer, PartialResult>> parts_List = partsRDD.collect();

        /* Add partial results computed on local nodes to the algorithm on master node */
        for (Tuple2<Integer, PartialResult> value : parts_List) {
            value._2.unpack(context);
            pcaMaster.input.add( MasterInputId.partialResults, value._2 );
        }

        /* Compute PCA on master node */
        pcaMaster.compute();

        /* Finalize the computations and retrieve PCA results */
        Result res = pcaMaster.finalizeCompute();
        result.eigenVectors = (HomogenNumericTable)res.get(ResultId.eigenVectors);
        result.eigenValues  = (HomogenNumericTable)res.get(ResultId.eigenValues);
    }
}

Building and Running the Sample

Intel DAAL distributed algorithms are abstracted from underlying cross-device communication technology, so it allows using the library in variety of multi-device computing and data transfer scenarios. It includes but not limited to MPI* based cluster environments, Hadoop*/Spark* based cluster environments, low level data exchange protocols, etc. In this sample, we leverage Spark cluster to manage the node, data block assignment and communication.

The script file laucher.sh will help to

  1. Set SPARKE_HOME, HADOOP, SHAREDLIBS environment variables. For example,
export JAVA_HOME=/usr/spark/pkgs/java/1.7.0.45-64/jre/
export HADOOP_CONF_DIR=/home/spark/hadoop-2.2.0/etc/hadoop/
export SPARK_HOME=/home/spark/spark-1.2-yarn
export DAALROOT=/home/spark/intel/compilers_and_libraries_2016.0.xx/linux/daal

source ${DAALROOT}/bin/daalvars.sh intel64

export CLASSPATH=/home/spark/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:/home/spark/hadoop-2.2.0/share/hadoop/common/hadoop-common-2.2.0.jar:/home/spark/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:/home/spark/spark-1.2-yarn/core/target/spark-core_2.10-1.2.1-SNAPSHOT.jar:/home/spark/spark-1.2-yarn/mllib/target/spark-mllib_2.10-1.2.1-SNAPSHOT.jar:${DAALROOT}/lib/daal.jar:${CLASSPATH}

export SCALA_JARS=/home/spark/scala-2.10.4/lib/scala-library.jar
export SHAREDLIBS=${DAALROOT}/lib/${daal_ia}_lin/libJavaAPI.so,${DAALROOT}/../tbb/lib/${daal_ia}_lin/gcc4.4/libtbb.so.2,${DAALROOT}/../compiler/lib/${daal_ia}_lin/libiomp5.so
  1. Build the sample
javac -d ./_results/${sample} -sourcepath. / sources/*${sample}.java sources/DistributedHDFSDataSet.java

cd _results/${sample}

# Creating jar
jar -cvfe spark${sample}.jar DAAL.Sample${sample} ./* >> ${sample}.log
  1. Run the sample
/home/spark/spark-1.2-yarn/bin/spark-submit --driver-class-path \"${DAALROOT}/lib/daal.jar:${SCALA_JARS}\" --jars ${DAALROOT}/lib/daal.jar --files ${SHAREDLIBS},${DAALROOT}/lib/daal.jar -v --master yarn-cluster --deploy-mode cluster --class DAAL.Sample${sample} spark${sample}.jar"

The result will be written to master node’s stdout. For example, the result was showed as figure 4.

 

Result of distributing processing of PCA algorithm

There is some performance benchmark about Intel DAAL PCA Cor vs Spark MLLIB, please see Link.

Troubleshooting

The program is supposed to run smoothly if environment is ready. But you may run into some problems, mainly about environment setting.

  1. Wrong FS: hdfs://xxx/libiomp5.so, expected: viewfs://cmt/

Solution: manually copy all dependency libraries defined in SHAREDLIBS to work folder as required.

Uploading resource file:/home/spark/intel/compilers_and_libraries_2016.0.107/linux/daal/lib/intel64_lin/libJavaAPI.so -> hdfs://dl-s2:9000/user/spark/.sparkStaging/application_1440055872445_0024/libJavaAPI.so

  1. Exception in thread "main" …. libJavaAPI.so: ELF file OS ABI invalid
  2. Run failed as out of memory, with errors like "Total size of serialized results of 16 tasks (2.GB) is bigger than spark.driver.maxResultSize (2.0 GB)"
    Solution, to increase the driver.maxResultSize (default 1G)
spark-submit -v --master yarn-cluster --deploy-mode cluster --class DAAL.SamplePcaCor --conf "spark.driver.maxResultSize=20g" --num-executors 10 --jars ${DAALROOT}/lib/daal.jar,${SCALA_JARS},${SHAREDLIBS} sparkpcacor.jar

 

Product and Performance Information

1

Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

Notice revision #20110804