Volume 3 introduced various stages of the predictive model fitting and deployment process in Intel® Distribution for Python's (IDP) Intel® Data Analytics Acceleration Library (Intel® DAAL) used in a batch processing environment. In this volume, we will go deeper into other processing modes that Intel DAAL has in store; mainly focusing on accelerating the training stage. This article will illustrate two major computation modes which distinguish Intel® DAAL from other popular data analytics libraries available in the market: Distributed Processing and Online Processing modes.
To accelerate the model training process, Intel DAAL supports distributed processing mode for large datasets including a programming model that makes it easy for users to implement a Master-Slave approach. Mpi4py can be easily interfaced with PyDAAL (Intel DAAL's Python API), as Intel DAAL's serialization and deserialization classes enable data exchange between nodes during parallel computation.
To accelerate the model re-training process and to overcome challenges associated with limited compute resources, Intel DAAL includes online processing mode.
The demonstrations in this article require IDP, Intel DAAL and mpi4py installation which are available for free on Anaconda cloud.
conda create -n IDP –c intel intelpython3_full python=3.6
source activate IDP
In recent years, popular machine learning algorithms have been packaged into simple toolkits facilitating the work of Machine learning practitioners. Most of the libraries in these toolkits perform sequential algorithm computations, also known as batch processing. This type of processing becomes problematic when dealing with Big Data. If computation in batch processing mode is time-consuming to generate a single model result on Big Data, parameter tuning can become near-to-impossible. To address this limitation, Intel DAAL provides "distributed processing" alternatives to accommodate standard practices in the data science community.
For predictive analytics, PyDAAL and mpi4py can be used to quickly distribute model training for many of DAAL's algorithm implementations using the Single Program Multiple Data (SPMD) technique. Other Python* machine learning libraries allow for the easy application of a batch parameter-tuning grid search, mainly because it is an embarrassingly parallel process. What sets Intel DAAL apart is the included IA-optimized distributed versions of many of its model training classes that delivers fast and scalable training results, leading to faster parameter-tuning on large dataset. Additionally, acceleration of a single model training is enabled with similar syntaxes to batch learning. For these implementations, the DAAL engineering team has provided a slave method to compute partial training results on row-grouped chunks of data, and then a master method for reduction of the partial results into a final model result.
Messages passed with MPI4Py are passed as serialized objects. MPI4Py uses the popular Python object serialization library Pickle under the hood during this process. PyDAAL uses SWIG (Simplified Wrapper and Interface Generator) as its wrapper interface. Unfortunately, SWIG is not compatible with Pickle. Fortunately, DAAL has built-in serialized and deserialization functionality. See Trained Model Portability section from Volume 3 for details. The table below demonstrates the master and slave methods for the distributed version of PyDAAL's covariance model method.
Subsequent releases will have more distributed processing supported algorithms
deserialize helper functions are provided in the Trained Model Portability from Volume 3. (or) import from customUtils available on daaltces GitHub page
This demo is designed to work on a Linux* OS.
Note: The upcoming helper function requires "customUtils" module to be imported from daaltces GitHub Repository.
"customUtils" is available on daaltces GitHub page
The next section can be copied and pasted into a user's script or adapted to a specific use case. The helper function block provided below can be used carry out the distributed computation of the covariance matrix, but can be adapted for fitting other types of models. See Computation Modes section in developer's docs for more details on distributed model fitting. A full usage code example follows the helper function.
Helper function starts here:
''' --------------------------------------------------------------------------------- *************************HELPER FUCNTION STARTS HERE***************************** --------------------------------------------------------------------------------- ''' # Define slave compute routine ''' Defined Slave and Master Routines as Python Functions Returns serialized partial model result. Input is serialized partial numeric table ''' from customUtils import getBlockOfNumericTable, serialize, deserialize # customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils. from daal.data_management import HomogenNumericTable from daal.algorithms.covariance import ( Distributed_Step1LocalFloat64DefaultDense, data, partialResults, Distributed_Step2MasterFloat64DefaultDense ) def computestep1Local(serialnT): # Deseralize using Helper Function partialnT = deserialize(serialnT) # Create partial model object model = Distributed_Step1LocalFloat64DefaultDense() # Set input data for the model model.input.set(data, partialnT) # Get the computed partial estimate result partialResult = model.compute() # Seralize using Helper Function serialpartialResult = serialize(partialResult) return serialpartialResult # Define master compute routine ''' Imports global variable finalResult. Computes master version of the model and sets full model result into finalResult. Inputs are array of serialized partial results and MPI world size ''' def computeOnMasterNode(serialPartialResult, size): global finalResult # Create master model object model = Distributed_Step2MasterFloat64DefaultDense() # Add partial results to the distributed master model for i in range(size): # Deseralize using Helper Function partialResult = deserialize(serialPartialResult[i]) # Set input objects for the model model.input.add(partialResults, partialResult) # Recompute a partial estimate after combining partial results model.compute() # Finalize the result in the distributed processing mode finalResult = model.finalizeCompute() ''' --------------------------------------------------------------------------------- *************************HELPER FUCNTION ENDS HERE***************************** --------------------------------------------------------------------------------- '''
The below example uses the complete block of helper functions given above and implements
computeOnMasterNode() functions with
mpi4py to construct a Covariance Matrix.
from mpi4py import MPI from customUtils import getBlockOfNumericTable, serialize, deserialize # customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils. from daal.data_management import HomogenNumericTable ''' Begin MPI Initialization and Run Options ''' # Get MPI vars size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() name = MPI.Get_processor_name() # Initialize result vars to fill serialPartialResults =  * size finalResult = None ''' Begin Data Set Creation The below example variable values can be used: numRows, numCols = 1000, 100 ''' # Create random array for demonstration # numRows, numCols defined by user seeded = np.random.RandomState(42) fullArray = seeded.rand(numRows, numCols) # Build seeded random data matrix, and slice into chunks # rowStart and rowEnd determined by size of the chunks sizeOfChunk = int(numRows/size) rowStart = rank*sizeOfChunk rowEnd = ((rank+1)*sizeOfChunk)-1 array = fullArray[rowStart:rowEnd, :] partialnT = HomogenNumericTable(array) serialnT = serialize(partialnT) ''' Begin Distributed Execution ''' if rank == 0: serialPartialResults[rank] = computestep1Local(serialnT) if size > 1: # Begin to receive slave partial results on the master for i in range(1, size): rank, size, name, serialPartialResults[rank] = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=1) computeOnMasterNode(serialPartialResults,size) else: serialPartialResult = computestep1Local(serialnT) MPI.COMM_WORLD.send((rank, size, name, serialPartialResult), dest=0, tag=1) ''' --------------------------------------------------------------- LINUX shell commands to run the covariance matrix usage example --------------------------------------------------------------- ''' # Source and activate Intel Distribution of Python (IDP) env source ../anaconda3/bin/activate source activate idp # optionally set mpi environmental variable to shared memory mode export I_MPI_SHM_LMT=shm # Cd to script directory, and call Python interpreter inside mpirun command cd ../script_directory mpirun –n # python script.py
What happens when training in batch mode becomes infeasible due to continuous dataset updates or limited resources?
Incremental learning(online processing in Intel DAAL) is a process of enhancing the existing trained model with new data instances; broadly used in scenarios where:
Industries like robotics, autonomous driving, and stock trading heavily depend on predictive analytics, demanding model updates with new learning experiences. In such situations batch processing can no longer remain a viable solution. Furthermore, data analytics applications that involve direct customer interaction (e.g., social media, e-commerce purchases) demand up-to-date trained models based on customer experiences. Incremental learning aims to deliver faster solutions by eliminating the time and effort to re-train a model every time new data arrives. Also, Incremental learning makes training models possible on Big Data even with resource scarcity.
Principle component analysis
Subsequent releases will have more online processing supported algorithms.
Note: The upcoming demonstration requires "customUtils" module to be imported from daaltces GitHub Repository
As a preliminary step, create three data partitions and save them to disk. We will use these data partitions to illustrate both the scenarios mentioned above.
#Create 4 random data partitions import numpy as np all_data = ['data-block-1.csv', 'data-block-2.csv', 'data-block-3.csv','data-block-new.csv'] for f in all_data: data = np.random.rand (1000, 11) np.savetxt(f,data,delimiter=",")
Incrementally train linear regression model on two data partitions
import sys, os sys.path.append(os.path.join(os.path.dirname(sys.executable),'share','pydaal_examples','examples','python','source')) from daal.algorithms.linear_regression import training from daal.algorithms.linear_regression.training import data, dependentVariables from customUtils import getNumericTableFromCSV, \ getBlockOfNumericTable, serialize, deserialize# customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils. from utils import printNumericTable # Create an online algorithm object algorithm = training.Online () # Create list of data blocks all_data = ['data-block-1.csv', 'data-block-2.csv','data-block-3.csv'] # Iterate through all data blocks and train/update results for block in all_data: nT = getNumericTableFromCSV (block) # Split nT into predictors and labels inpdata = getBlockOfNumericTable (nT, Columns=10) labels = getBlockOfNumericTable (nT, Columns=[10, ]) # Set the algorithm input parameters algorithm.input.set (data, inpdata) algorithm.input.set (dependentVariables, labels) # compute partial model results algorithm.compute () # Serialize and save the partial results to disk. # This partial result will be later used in the next usage example, # to re-train on new instances par_trainingResult = algorithm.getPartialResult () serialize (par_trainingResult, fileName="par_trainingResult.npy") # Compute final results trainingResult = algorithm.finalizeCompute () printNumericTable (trainingResult.get (training.model).getBeta (), "Linear Regression coefficients:")
Re-train the serialized "parTrainingResult" obtained in Scenario 1 with a new data partition.
import sys, os sys.path.append(os.path.join(os.path.dirname(sys.executable),'share','pydaal_examples','examples','python','source')) from daal.algorithms.linear_regression import training from daal.algorithms.linear_regression.training import data, dependentVariables from customUtils import getNumericTableFromCSV, \ getBlockOfNumericTable, serialize, deserialize from utils import printNumericTable # customUtils is available on daaltces GitHub page https://github.com/daaltces/pydaal-getting-started/tree/master/3-custom-modules/customUtils. algorithm_new = training.Online () #Deserialize and set the partial training results par_trainingResult = deserialize(fileName="par_trainingResult.npy") par_trainingResult.setInitFlag(True) algorithm_new.setPartialResult (par_trainingResult) #Create a numeric table of new data data instances new_nT = getNumericTableFromCSV ('data-block-new.csv') #Split new_nT into predictors and labels new_inpdata = getBlockOfNumericTable (new_nT, Columns=10) new_labels = getBlockOfNumericTable (new_nT, Columns=[10, ]) # Set the algorithm_new input parameters algorithm_new.input.set (training.data, new_inpdata) algorithm_new.input.set (training.dependentVariables, new_labels) #Compute partial model results algorithm_new.compute () #Compute final results trainingResult_new = algorithm_new.finalizeCompute () printNumericTable (trainingResult_new.get (training.model).getBeta (), "Linear Regression coefficients:")
-par_trainingResult.setInitFlag(True) is required to explicitly set the training result flag to include previously trained model results.
Partial results cannot be used to perform predictions. Final results must be computed to apply the algorithm on predictions / evaluation. Prediction and evaluation processes are explained in Volume 3 of this series.
Intel DAAL's distributed and online processing modes address various challenges imposed by Big Data and Streaming Data. Intel DAAL provides a flexible implementation based on the processing needs. In today's world, with fast-flowing and voluminous data, Intel DAAL can deliver better and faster solutions.
To summarize what we discussed in this volume; we overviewed Intel DAAL's various processing modes, their importance in predictive analytics domain, and implementations to train models. Additionally, we demonstrated through usage scenarios that these computation modes in Intel DAAL are trivial extensions to batch processing.
Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.
Notice revision #20110804