By PREETHI VENKATESH, Nathan G Greeneltch

Published:03/21/2018 Last Updated:03/21/2018

Volume 3 introduced various stages of the predictive model fitting and deployment process in Intel® Distribution for Python's (IDP) Intel® Data Analytics Acceleration Library (Intel® DAAL) used in a batch processing environment. In this volume, we will go deeper into other processing modes that Intel DAAL has in store; mainly focusing on accelerating the training stage. This article will illustrate two major computation modes which distinguish Intel® DAAL from other popular data analytics libraries available in the market: Distributed Processing and Online Processing modes.

To accelerate the model training process, Intel DAAL supports **distributed processing mode** for large datasets including a programming model that makes it easy for users to implement a Master-Slave approach. Mpi4py can be easily interfaced with PyDAAL (Intel DAAL's Python API), as Intel DAAL's serialization and deserialization classes enable data exchange between nodes during parallel computation.

To accelerate the model re-training process and to overcome challenges associated with limited compute resources, Intel DAAL includes **online processing mode**.

**Vol 1: Data Structures**- Introduces Data Management component of Intel DAAL and available custom Data Structures(Numeric Table and Data Dictionary) with code examples.**Vol 2: Basic Operations on Numeric Tables**- Introduces possible operations performed on Intel DAAL's custom Data Structure (Numeric Table and Data Dictionary) with code examples**Vol 3: Analytics Model Building and Deployment****–**Introduces analytics modeling and evaluation process in Intel DAAL with serialized deployment in batch processing.**Vol4: Distributed and Online Processing**– Introduces Intel DAAL's advanced processing modes (distributed and online) that support data analysis and model fitting on large and streaming data.

The demonstrations in this article require IDP, Intel DAAL and mpi4py installation which are available for free on Anaconda cloud.

- Install IDP full environment to install all the required packages
`conda create -n IDP –c intel intelpython3_full python=3.6`

- Activate IDP environment
`source activate IDP`

(or)

`activate IDP`

Refer to installation options and a complete list of Intel packages for more information

In recent years, popular machine learning algorithms have been packaged into simple toolkits facilitating the work of Machine learning practitioners. Most of the libraries in these toolkits perform sequential algorithm computations, also known as batch processing. This type of processing becomes problematic when dealing with Big Data. If computation in batch processing mode is time-consuming to generate a single model result on Big Data, parameter tuning can become near-to-impossible. To address this limitation, Intel DAAL provides "distributed processing" alternatives to accommodate standard practices in the data science community.

For predictive analytics, PyDAAL and mpi4py can be used to quickly distribute model training for many of DAAL's algorithm implementations using the Single Program Multiple Data (SPMD) technique. Other Python* machine learning libraries allow for the easy application of a batch parameter-tuning grid search, mainly because it is an embarrassingly parallel process. What sets Intel DAAL apart is the included IA-optimized distributed versions of many of its model training classes that delivers fast and scalable training results, leading to faster parameter-tuning on large dataset. Additionally, acceleration of a single model training is enabled with similar syntaxes to batch learning. For these implementations, the DAAL engineering team has provided a slave method to compute partial training results on row-grouped chunks of data, and then a master method for reduction of the partial results into a final model result.

Messages passed with MPI4Py are passed as serialized objects. MPI4Py uses the popular Python object serialization library Pickle under the hood during this process. PyDAAL uses SWIG (Simplified Wrapper and Interface Generator) as its wrapper interface. Unfortunately, SWIG is not compatible with Pickle. Fortunately, DAAL has built-in serialized and deserialization functionality. See Trained Model Portability section from Volume 3 for details. The table below demonstrates the master and slave methods for the distributed version of PyDAAL's covariance model method.

- Linear and Ridge Regression
- Naïve Bayes Classifier
- Recommender systems
- Neural Networks

- K-Means Clustering
- Principle component analysis

- Moments of Low order
- Covariance matrix
- Singular Value Decomposition
- QR decomposition

Subsequent releases will have more distributed processing supported algorithms

Note: The `serialize`

and `deserialize`

helper functions are provided in the Trained Model Portability from Volume 3. (or) import from customUtils available on daaltces GitHub page

This demo is designed to work on a Linux* OS.

Note: The upcoming helper function requires "customUtils" module to be imported from daaltces GitHub Repository.

"customUtils" is available on daaltces GitHub page

The next section can be copied and pasted into a user's script or adapted to a specific use case. The helper function block provided below can be used carry out the distributed computation of the covariance matrix, but can be adapted for fitting other types of models. See Computation Modes section in developer's docs for more details on distributed model fitting. A full usage code example follows the helper function.

Helper function starts here:

```
'''
---------------------------------------------------------------------------------
*************************HELPER FUCNTION STARTS HERE*****************************
---------------------------------------------------------------------------------
'''
# Define slave compute routine
'''
Defined Slave and Master Routines as Python Functions
Returns serialized partial model result. Input is serialized partial numeric table
'''
from customUtils import getBlockOfNumericTable, serialize, deserialize # customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.
from daal.data_management import HomogenNumericTable
from daal.algorithms.covariance import (
Distributed_Step1LocalFloat64DefaultDense, data, partialResults,
Distributed_Step2MasterFloat64DefaultDense
)
def computestep1Local(serialnT):
# Deseralize using Helper Function
partialnT = deserialize(serialnT)
# Create partial model object
model = Distributed_Step1LocalFloat64DefaultDense()
# Set input data for the model
model.input.set(data, partialnT)
# Get the computed partial estimate result
partialResult = model.compute()
# Seralize using Helper Function
serialpartialResult = serialize(partialResult)
return serialpartialResult
# Define master compute routine
'''
Imports global variable finalResult. Computes master version of the model and sets full model result into finalResult. Inputs are array of serialized partial results and MPI world size
'''
def computeOnMasterNode(serialPartialResult, size):
global finalResult
# Create master model object
model = Distributed_Step2MasterFloat64DefaultDense()
# Add partial results to the distributed master model
for i in range(size):
# Deseralize using Helper Function
partialResult = deserialize(serialPartialResult[i])
# Set input objects for the model
model.input.add(partialResults, partialResult)
# Recompute a partial estimate after combining partial results
model.compute()
# Finalize the result in the distributed processing mode
finalResult = model.finalizeCompute()
'''
---------------------------------------------------------------------------------
*************************HELPER FUCNTION ENDS HERE*****************************
---------------------------------------------------------------------------------
'''
```

The below example uses the complete block of helper functions given above and implements `computestep1Local()`

, `computeOnMasterNode()`

functions with `mpi4py`

to construct a Covariance Matrix.

```
from mpi4py import MPI
from customUtils import getBlockOfNumericTable, serialize, deserialize
# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.
from daal.data_management import HomogenNumericTable
'''
Begin MPI Initialization and Run Options
'''
# Get MPI vars
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
# Initialize result vars to fill
serialPartialResults = [0] * size
finalResult = None
'''
Begin Data Set Creation
The below example variable values can be used:
numRows, numCols = 1000, 100
'''
# Create random array for demonstration
# numRows, numCols defined by user
seeded = np.random.RandomState(42)
fullArray = seeded.rand(numRows, numCols)
# Build seeded random data matrix, and slice into chunks
# rowStart and rowEnd determined by size of the chunks
sizeOfChunk = int(numRows/size)
rowStart = rank*sizeOfChunk
rowEnd = ((rank+1)*sizeOfChunk)-1
array = fullArray[rowStart:rowEnd, :]
partialnT = HomogenNumericTable(array)
serialnT = serialize(partialnT)
'''
Begin Distributed Execution
'''
if rank == 0:
serialPartialResults[rank] = computestep1Local(serialnT)
if size > 1:
# Begin to receive slave partial results on the master
for i in range(1, size):
rank, size, name, serialPartialResults[rank] =
MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=1)
computeOnMasterNode(serialPartialResults,size)
else:
serialPartialResult = computestep1Local(serialnT)
MPI.COMM_WORLD.send((rank, size, name, serialPartialResult), dest=0, tag=1)
'''
---------------------------------------------------------------
LINUX shell commands to run the covariance matrix usage example
---------------------------------------------------------------
'''
# Source and activate Intel Distribution of Python (IDP) env
source ../anaconda3/bin/activate
source activate idp
# optionally set mpi environmental variable to shared memory mode
export I_MPI_SHM_LMT=shm
# Cd to script directory, and call Python interpreter inside mpirun command
cd ../script_directory
mpirun –n # python script.py
```

What happens when training in batch mode becomes infeasible due to continuous dataset updates or limited resources?

Incremental learning(online processing in Intel DAAL) is a process of enhancing the existing trained model with new data instances; broadly used in scenarios where:

- Limited in-memory resources preclude large dataset load. In such cases, datasets are partitioned into blocks to load block partitions and train the model incrementally.
- New data streams in periodically and a previously trained model requires update (provided existing data instances continue to stay relevant). To overcome the painful process of re-training the whole model every time new data instances are loaded, Incremental learning algorithm preserves the existing trained model details and updates the model only with new data occurrences.

Industries like robotics, autonomous driving, and stock trading heavily depend on predictive analytics, demanding model updates with new learning experiences. In such situations batch processing can no longer remain a viable solution. Furthermore, data analytics applications that involve direct customer interaction (e.g., social media, e-commerce purchases) demand up-to-date trained models based on customer experiences. Incremental learning aims to deliver faster solutions by eliminating the time and effort to re-train a model every time new data arrives. Also, Incremental learning makes training models possible on Big Data even with resource scarcity.

- Linear Regression
- Ridge Regression
- Naïve Bayes

Principle component analysis

- Singular value decomposition
- Moments of low order
- Correlation and covariance

Subsequent releases will have more online processing supported algorithms.

Note: The upcoming demonstration requires "customUtils" module to be imported from daaltces GitHub Repository

As a preliminary step, create three data partitions and save them to disk. We will use these data partitions to illustrate both the scenarios mentioned above.

```
#Create 4 random data partitions
import numpy as np
all_data = ['data-block-1.csv', 'data-block-2.csv', 'data-block-3.csv','data-block-new.csv']
for f in all_data:
data = np.random.rand (1000, 11)
np.savetxt(f,data,delimiter=",")
```

Incrementally train linear regression model on two data partitions

```
import sys, os
sys.path.append(os.path.join(os.path.dirname(sys.executable),'share','pydaal_examples','examples','python','source'))
from daal.algorithms.linear_regression import training
from daal.algorithms.linear_regression.training import data, dependentVariables
from customUtils import getNumericTableFromCSV, \
getBlockOfNumericTable, serialize, deserialize# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.
from utils import printNumericTable
# Create an online algorithm object
algorithm = training.Online ()
# Create list of data blocks
all_data = ['data-block-1.csv', 'data-block-2.csv','data-block-3.csv']
# Iterate through all data blocks and train/update results
for block in all_data:
nT = getNumericTableFromCSV (block)
# Split nT into predictors and labels
inpdata = getBlockOfNumericTable (nT, Columns=10)
labels = getBlockOfNumericTable (nT, Columns=[10, ])
# Set the algorithm input parameters
algorithm.input.set (data, inpdata)
algorithm.input.set (dependentVariables, labels)
# compute partial model results
algorithm.compute ()
# Serialize and save the partial results to disk.
# This partial result will be later used in the next usage example,
# to re-train on new instances
par_trainingResult = algorithm.getPartialResult ()
serialize (par_trainingResult, fileName="par_trainingResult.npy")
# Compute final results
trainingResult = algorithm.finalizeCompute ()
printNumericTable (trainingResult.get (training.model).getBeta (), "Linear Regression coefficients:")
```

Re-train the serialized "parTrainingResult" obtained in Scenario 1 with a new data partition.

```
import sys, os
sys.path.append(os.path.join(os.path.dirname(sys.executable),'share','pydaal_examples','examples','python','source'))
from daal.algorithms.linear_regression import training
from daal.algorithms.linear_regression.training import data, dependentVariables
from customUtils import getNumericTableFromCSV, \
getBlockOfNumericTable, serialize, deserialize
from utils import printNumericTable
# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.
algorithm_new = training.Online ()
#Deserialize and set the partial training results
par_trainingResult = deserialize(fileName="par_trainingResult.npy")
par_trainingResult.setInitFlag(True)
algorithm_new.setPartialResult (par_trainingResult)
#Create a numeric table of new data data instances
new_nT = getNumericTableFromCSV ('data-block-new.csv')
#Split new_nT into predictors and labels
new_inpdata = getBlockOfNumericTable (new_nT, Columns=10)
new_labels = getBlockOfNumericTable (new_nT, Columns=[10, ])
# Set the algorithm_new input parameters
algorithm_new.input.set (training.data, new_inpdata)
algorithm_new.input.set (training.dependentVariables, new_labels)
#Compute partial model results
algorithm_new.compute ()
#Compute final results
trainingResult_new = algorithm_new.finalizeCompute ()
printNumericTable (trainingResult_new.get (training.model).getBeta (), "Linear Regression coefficients:")
```

Note that `-par_trainingResult.setInitFlag(True)`

is required to explicitly set the training result flag to include previously trained model results.

Partial results cannot be used to perform predictions. Final results must be computed to apply the algorithm on predictions / evaluation. Prediction and evaluation processes are explained in Volume 3 of this series.

Intel DAAL's distributed and online processing modes address various challenges imposed by Big Data and Streaming Data. Intel DAAL provides a flexible implementation based on the processing needs. In today's world, with fast-flowing and voluminous data, Intel DAAL can deliver better and faster solutions.

To summarize what we discussed in this volume; we overviewed Intel DAAL's various processing modes, their importance in predictive analytics domain, and implementations to train models. Additionally, we demonstrated through usage scenarios that these computation modes in Intel DAAL are trivial extensions to batch processing.

Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

Notice revision #20110804