Gentle Introduction to PyDAAL: Vol 4 Distributed and Online Processing

By PREETHI VENKATESH, Nathan G Greeneltch, Published: 03/21/2018, Last Updated: 03/21/2018

Introduction

Volume 3 introduced various stages of the predictive model fitting and deployment process in Intel® Distribution for Python's (IDP) Intel® Data Analytics Acceleration Library (Intel® DAAL) used in a batch processing environment. In this volume, we will go deeper into other processing modes that Intel DAAL has in store; mainly focusing on accelerating the training stage. This article will illustrate two major computation modes which distinguish Intel® DAAL from other popular data analytics libraries available in the market: Distributed Processing and Online Processing modes.

To accelerate the model training process, Intel DAAL supports distributed processing mode for large datasets including a programming model that makes it easy for users to implement a Master-Slave approach. Mpi4py can be easily interfaced with PyDAAL (Intel DAAL's Python API), as Intel DAAL's serialization and deserialization classes enable data exchange between nodes during parallel computation.

To accelerate the model re-training process and to overcome challenges associated with limited compute resources, Intel DAAL includes online processing mode.

Volumes in Gentle Introduction Series

  • Vol 1: Data Structures - Introduces Data Management component of Intel DAAL and available custom Data Structures(Numeric Table and Data Dictionary) with code examples.
  • Vol 2: Basic Operations on Numeric Tables - Introduces possible operations performed on Intel DAAL's custom Data Structure (Numeric Table and Data Dictionary) with code examples
  • Vol 3: Analytics Model Building and Deployment Introduces analytics modeling and evaluation process in Intel DAAL with serialized deployment in batch processing.
  • Vol4: Distributed and Online Processing – Introduces Intel DAAL's advanced processing modes (distributed and online) that support data analysis and model fitting on large and streaming data.

IDP and Intel® Data Analytics Acceleration Library (Intel® DAAL) installation

The demonstrations in this article require IDP, Intel DAAL and mpi4py installation which are available for free on Anaconda cloud.

  1. Install IDP full environment to install all the required packages 
    conda create -n IDP –c intel intelpython3_full python=3.6

     

  2. Activate IDP environment

    source activate IDP

    (or)

    activate IDP

Refer to installation options and a complete list of Intel packages for more information

1. Distributed Learning with PyDAAL and MPI

1.1 Background

In recent years, popular machine learning algorithms have been packaged into simple toolkits facilitating the work of Machine learning practitioners. Most of the libraries in these toolkits perform sequential algorithm computations, also known as batch processing. This type of processing becomes problematic when dealing with Big Data. If computation in batch processing mode is time-consuming to generate a single model result on Big Data, parameter tuning can become near-to-impossible. To address this limitation, Intel DAAL provides "distributed processing" alternatives to accommodate standard practices in the data science community.

For predictive analytics, PyDAAL and mpi4py can be used to quickly distribute model training for many of DAAL's algorithm implementations using the Single Program Multiple Data (SPMD) technique. Other Python* machine learning libraries allow for the easy application of a batch parameter-tuning grid search, mainly because it is an embarrassingly parallel process. What sets Intel DAAL apart is the included IA-optimized distributed versions of many of its model training classes that delivers fast and scalable training results, leading to faster parameter-tuning on large dataset. Additionally, acceleration of a single model training is enabled with similar syntaxes to batch learning. For these implementations, the DAAL engineering team has provided a slave method to compute partial training results on row-grouped chunks of data, and then a master method for reduction of the partial results into a final model result.

1.1.1 Serialization and message passing

Messages passed with MPI4Py are passed as serialized objects. MPI4Py uses the popular Python object serialization library Pickle under the hood during this process. PyDAAL uses SWIG (Simplified Wrapper and Interface Generator) as its wrapper interface. Unfortunately, SWIG is not compatible with Pickle. Fortunately, DAAL has built-in serialized and deserialization functionality. See Trained Model Portability section from Volume 3 for details. The table below demonstrates the master and slave methods for the distributed version of PyDAAL's covariance model method.

1.2 Batch vs distributed computation overview:

table with code

1.3 Available Intel DAAL distributed processing algorithms

Supervised algorithms:

  1. Linear and Ridge Regression
  2. Naïve Bayes Classifier
  3. Recommender systems
  4. Neural Networks

Unsupervised algorithms

  1. K-Means Clustering
  2. Principle component analysis

Other analysis

  1. Moments of Low order
  2. Covariance matrix
  3. Singular Value Decomposition
  4. QR decomposition

Subsequent releases will have more distributed processing supported algorithms

1.4 Distributed processing detailed workflow in Intel DAAL

table with code

Note: The serialize and deserialize helper functions are provided in the Trained Model Portability from  Volume 3. (or) import from customUtils available on daaltces GitHub page

1.5 Covariance matrix distributed processing demonstration

This demo is designed to work on a Linux* OS.

Helper functions: Covariance matrix

Note: The upcoming helper function requires "customUtils" module to be imported from daaltces GitHub Repository.
 "customUtils" is available on daaltces GitHub page

The next section can be copied and pasted into a user's script or adapted to a specific use case. The helper function block provided below can be used carry out the distributed computation of the covariance matrix, but can be adapted for fitting other types of models. See Computation Modes section in developer's docs for more details on distributed model fitting. A full usage code example follows the helper function.

Helper function starts here:

'''
---------------------------------------------------------------------------------
*************************HELPER FUCNTION STARTS HERE*****************************
---------------------------------------------------------------------------------
'''
# Define slave compute routine

''' 
Defined Slave and Master Routines as Python Functions 
Returns serialized partial model result. Input is serialized partial numeric table 
'''
from customUtils import getBlockOfNumericTable, serialize, deserialize # customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.
from daal.data_management import HomogenNumericTable
from daal.algorithms.covariance import (
    Distributed_Step1LocalFloat64DefaultDense, data, partialResults,
    Distributed_Step2MasterFloat64DefaultDense
)

   
def computestep1Local(serialnT):
   # Deseralize using Helper Function
   partialnT = deserialize(serialnT)
   # Create partial model object
   model = Distributed_Step1LocalFloat64DefaultDense()
   # Set input data for the model
   model.input.set(data, partialnT)
   # Get the computed partial estimate result
   partialResult = model.compute()
   # Seralize using Helper Function
   serialpartialResult = serialize(partialResult)
    
   return serialpartialResult

# Define master compute routine
''' 
Imports global variable finalResult. Computes master version of the model and sets full model result into finalResult. Inputs are array of serialized partial results and MPI world size 
'''
def computeOnMasterNode(serialPartialResult, size):
    global finalResult
    # Create master model object
    model = Distributed_Step2MasterFloat64DefaultDense()
    # Add partial results to the distributed master model
    for i in range(size):        
        # Deseralize using Helper Function
        partialResult = deserialize(serialPartialResult[i])  
        # Set input objects for the model
        model.input.add(partialResults, partialResult)
    # Recompute a partial estimate after combining partial results
    model.compute()
    # Finalize the result in the distributed processing mode
    finalResult = model.finalizeCompute()
'''
---------------------------------------------------------------------------------
*************************HELPER FUCNTION ENDS HERE*****************************
---------------------------------------------------------------------------------
'''

Usage example: Covariance matrix

The below example uses the complete block of helper functions given above and implements computestep1Local(), computeOnMasterNode() functions with mpi4py to construct a Covariance Matrix.

from mpi4py import MPI
from customUtils import getBlockOfNumericTable, serialize, deserialize
# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.
from daal.data_management import HomogenNumericTable

''' 
Begin MPI Initialization and Run Options 
'''
# Get MPI vars
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()    
   
# Initialize result vars to fill
serialPartialResults = [0] * size
finalResult = None

''' 
Begin Data Set Creation 

The below example variable values can be used:
numRows, numCols = 1000, 100

'''
# Create random array for demonstration
# numRows, numCols defined by user
seeded = np.random.RandomState(42)
fullArray = seeded.rand(numRows, numCols) 

# Build seeded random data matrix, and slice into chunks
# rowStart and rowEnd determined by size of the chunks
sizeOfChunk = int(numRows/size)
rowStart = rank*sizeOfChunk
rowEnd = ((rank+1)*sizeOfChunk)-1
array = fullArray[rowStart:rowEnd, :]
partialnT = HomogenNumericTable(array)
serialnT = serialize(partialnT)


'''
Begin Distributed Execution 
'''

if rank == 0:

   serialPartialResults[rank] = computestep1Local(serialnT)
   
   if size > 1:
      # Begin to receive slave partial results on the master
      for i in range(1, size):
         rank, size, name, serialPartialResults[rank] =
				MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=1)

   computeOnMasterNode(serialPartialResults,size)

else:
   serialPartialResult =  computestep1Local(serialnT)
   MPI.COMM_WORLD.send((rank, size, name, serialPartialResult), dest=0, tag=1)


'''
---------------------------------------------------------------
LINUX shell commands to run the covariance matrix usage example 
---------------------------------------------------------------
'''

# Source and activate Intel Distribution of Python (IDP) env
source ../anaconda3/bin/activate
source activate idp
# optionally set mpi environmental variable to shared memory mode
export I_MPI_SHM_LMT=shm

# Cd to script directory, and call Python interpreter inside mpirun command
cd ../script_directory
mpirun –n # python script.py

2. Incremental Learning with PyDAAL

2.1 Background

What happens when training in batch mode becomes infeasible due to continuous dataset updates or limited resources?

Incremental learning(online processing in Intel DAAL) is a process of enhancing the existing trained model with new data instances; broadly used in scenarios where:

  1. Limited in-memory resources preclude large dataset load. In such cases, datasets are partitioned into blocks to load block partitions and train the model incrementally.
  2. New data streams in periodically and a previously trained model requires update (provided existing data instances continue to stay relevant). To overcome the painful process of re-training the whole model every time new data instances are loaded, Incremental learning algorithm preserves the existing trained model details and updates the model only with new data occurrences.

Industries like robotics, autonomous driving, and stock trading heavily depend on predictive analytics, demanding model updates with new learning experiences. In such situations batch processing can no longer remain a viable solution. Furthermore, data analytics applications that involve direct customer interaction (e.g., social media, e-commerce purchases) demand up-to-date trained models based on customer experiences. Incremental learning aims to deliver faster solutions by eliminating the time and effort to re-train a model every time new data arrives. Also, Incremental learning makes training models possible on Big Data even with resource scarcity.

2.2 Batch vs Online computation overview

table with code

2.2 Available Intel DAAL incremental learning algorithms

Supervised learning algorithms

  1. Linear Regression
  2. Ridge Regression
  3. Naïve Bayes

Unsupervised learning algorithms

Principle component analysis

Other analysis

  1. Singular value decomposition
  2. Moments of low order
  3. Correlation and covariance

Subsequent releases will have more online processing supported algorithms.

2.3 Incremental learning detailed workflow in Intel DAAL

table with code

2.4 Linear regression online processing demonstration

Note: The upcoming demonstration requires "customUtils" module to be imported from daaltces GitHub Repository

As a preliminary step, create three data partitions and save them to disk. We will use these data partitions to illustrate both the scenarios mentioned above.

#Create 4 random data partitions

import numpy as np
all_data = ['data-block-1.csv', 'data-block-2.csv', 'data-block-3.csv','data-block-new.csv']
for f in all_data:
    data = np.random.rand (1000, 11)
    np.savetxt(f,data,delimiter=",")

2.4.1 Scenario 1: Train on limited in-memory space

Incrementally train linear regression model on two data partitions

import sys, os
sys.path.append(os.path.join(os.path.dirname(sys.executable),'share','pydaal_examples','examples','python','source'))
from daal.algorithms.linear_regression import training
from daal.algorithms.linear_regression.training import data, dependentVariables
from customUtils import getNumericTableFromCSV, \
                        getBlockOfNumericTable, serialize, deserialize# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.

from utils import printNumericTable

# Create an online algorithm object
algorithm = training.Online ()

# Create list of data blocks
all_data = ['data-block-1.csv', 'data-block-2.csv','data-block-3.csv']

# Iterate through all data blocks and train/update results
for block in all_data:
     nT = getNumericTableFromCSV (block)
     # Split nT into predictors and labels
     inpdata = getBlockOfNumericTable (nT, Columns=10)
     labels = getBlockOfNumericTable (nT, Columns=[10, ])
     # Set the algorithm input parameters
     algorithm.input.set (data, inpdata)
     algorithm.input.set (dependentVariables, labels)
     # compute partial model results
     algorithm.compute ()

# Serialize and save the partial results to disk.
# This partial result will be later used in the next usage example,
# to re-train on new instances
par_trainingResult = algorithm.getPartialResult ()
serialize (par_trainingResult, fileName="par_trainingResult.npy")
# Compute final results
trainingResult = algorithm.finalizeCompute ()
printNumericTable (trainingResult.get (training.model).getBeta (), "Linear Regression coefficients:")

2.4.2 Scenario 2: Train on new data instances

Re-train the serialized "parTrainingResult" obtained in Scenario 1 with a new data partition.

import sys, os
sys.path.append(os.path.join(os.path.dirname(sys.executable),'share','pydaal_examples','examples','python','source'))
from daal.algorithms.linear_regression import training 
from  daal.algorithms.linear_regression.training import data, dependentVariables
from customUtils import getNumericTableFromCSV, \
    getBlockOfNumericTable, serialize, deserialize  
from utils import printNumericTable
# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils.    

algorithm_new = training.Online ()
#Deserialize and set the partial training results
par_trainingResult = deserialize(fileName="par_trainingResult.npy")
par_trainingResult.setInitFlag(True)
algorithm_new.setPartialResult (par_trainingResult)
#Create a numeric table of new data data instances
new_nT = getNumericTableFromCSV ('data-block-new.csv')
#Split new_nT into predictors and labels
new_inpdata = getBlockOfNumericTable (new_nT, Columns=10)
new_labels = getBlockOfNumericTable (new_nT, Columns=[10, ])
# Set the algorithm_new input parameters   
algorithm_new.input.set (training.data, new_inpdata)
algorithm_new.input.set (training.dependentVariables, new_labels)
#Compute partial model results
algorithm_new.compute ()
#Compute final results
trainingResult_new = algorithm_new.finalizeCompute ()
printNumericTable (trainingResult_new.get (training.model).getBeta (), "Linear Regression coefficients:")	

Note that -par_trainingResult.setInitFlag(True) is required to explicitly set the training result flag to include previously trained model results.

Partial results cannot be used to perform predictions. Final results must be computed to apply the algorithm on predictions / evaluation. Prediction and evaluation processes are explained in Volume 3 of this series.

3. Conclusion

Intel DAAL's distributed and online processing modes address various challenges imposed by Big Data and Streaming Data. Intel DAAL provides a flexible implementation based on the processing needs. In today's world, with fast-flowing and voluminous data, Intel DAAL can deliver better and faster solutions.

To summarize what we discussed in this volume; we overviewed Intel DAAL's various processing modes, their importance in predictive analytics domain, and implementations to train models. Additionally, we demonstrated through usage scenarios that these computation modes in Intel DAAL are trivial extensions to batch processing.

4. Other Related Links

Product and Performance Information

1

Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

Notice revision #20110804