Thread Safety on Intel DAAL functions

Thread Safety on Intel DAAL functions

Hello,

I would like to use Intel DAAL in a shared memory application. I don't want to use the internal Intel DAAL parallelism, but run the Intel DAAL algorithms in custom pthreads in parallel. For example, I want to compute a cholesky kernel. To do so, I would like to "manually" create n threads, all of which share the same FileDataSource object from which they obtain a different block of data of the same file. Then, each thread would partially compute cholesky on its block of data (algorithm.compute()) and when all threads are done, the main thread would finish with (algorithm.finalizeCompute()).

I have not been able to find any specific notes on thread safety for Intel DAAL functions. It is possible? Could you please point me to some best practices?

Thank you.

11 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

Hello,

The present version of Intel DAAL documentation does not provide many details on the thread safety aspect of the library, we consider to add it in the future releases, thank you.

Answering your question:

- The library provides distributed versions of the algorithms such as PCA, SVD, k-means, Linear Regression, … that can be used  on the computer with shared memory as you describe below.

- You can use the File Data Source to access different blocks of data in parallel to inject them into distributed algorithms

I provide a PCA/csv data source based code sample that demonstrate those ideas

 

// Preparation for master thread

{

pca::Distributed<step1Local, algorithmFPType, pca::svdDense> localAlgorithms[nThread];

pca::PartialResultPtr partialResults[nThread];

}

// Local code for each thread

{

// Reading the local data

FileDataSource<CSVFeatureManager> dataSource(datasetName);

dataSource.loadDataBlock(nRowsInBlock, iThread*nRowsInBlock, nRowsInBlock);

localAlgorithms[iThread].input.set(pca::data, dataSource.getNumericTable());

 

//  Compute PCA decomposition

localAlgorithms[iThread].compute();

partialResults[iThread] = localAlgorithms[iThread].getPartialResult();

}

// Master thread code

{

// Set local partial results as input for the master-node algorithm  from each thread

pca::Distributed<step2Master, algorithmFPType, pca::svdDense> masterAlgorithm;

for(size_t i=0; i<nThread; ++i)

masterAlgorithm.input.add(pca::partialResults, partialResults[i]);

// Merge and finalize PCA decomposition on the master thread

masterAlgorithm.compute();

masterAlgorithm.finalizeCompute();

}

 

You can find additional examples at https://github.com/intel/daal/tree/daal_2019_beta/examples/cpp/source

Also, the present version of Intel DAAL Cholesky algorithm does not support distributed computations yet. Please, clarify whether distributed Cholesky is important for your applications

Let us know, if it answer your questions or you need additional details on the library and its components, and we will gladly help.

Hello Egor,

Thank you very much for your example. No, cholesky is not particularly relevant for me, pca is perfectly fine.

I have successfully built and run a PCA application using your guidelines. However, I have encountered the following issues with your code snippet:

  1. "pca::PartialResultPtr" does not exist, I have changed it by "services::SharedPtr<pca::PartialResult<pca::svdDense> > partialResults[nThread];"
  2. "dataSource.loadDataBlock(nRowsInBlock, iThread*nRowsInBlock, nRowsInBlock);" complains for iThread > 0, I changed it for "dataSource.loadDataBlock(nRowsInBlock, iThread*nRowsInBlock, nThread*nRowsInBlock);"

Are the proposed changes correct?

Thank you!

Hello,

I'm glad to hear it!

 

Yes, your proposed changes are correct. However, you can save more memory if you change

"dataSource.loadDataBlock (nRowsInBlock, iThread * nRowsInBlock, nThread * nRowsInBlock);"

for "dataSource.loadDataBlock (nRowsInBlock, iThread * nRowsInBlock, (iThread+1) * nRowsInBlock);"

Great, thank you! Could you please elaborate why? I don't really understand what this third field does and the API explanation is not clear enough for me:

[in] fullRows Maximum number of rows to allocate in the Numeric Table

In my case, each thread will process a single data block of nRowsInBlock, hence I would expect the fullRows parameter to be exactly nRowsInBlock (because each local numeric table will contain at maximum nRowsInBlock rows). Is really the thread number 3 allocating nRownsInBlock*3 rows although it only uses nRowsInBlock?

Thank you.

Hi,

DataSource provides API for loading data by blocks of rows from a file repeatedly:

size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) - for using of internal allocated NumericTable by method getNumericTable()

or

size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) – for writing the data to the user-provided NumericTable

This function reads only maxRows rows from the DataSource with offset (in rows) - rowOffset and writes them to the resulted NumericTable with the same offset - rowOffset, like this:

fullRows  - number of rows which will be allocated for resulted NumericTable by DataSource (it means that size of output NumericTable is always fullRows). So, fullRows  should be more or equal to (maxRows + rowOffset).

This API addresses the case when we the data is read by blocks from one instance of DataSource and place these blocks in one NumericTable with offset.

In your case (fullRows - maxRows) rows are not used (they contain uninitialized memory), because DataSource allocates fullRows rows for NumericTable, but only maxRows rows are used.

Hi Egor,

Thank you for your detailed answer, I understand now. Unfortunately, this approach's huge memory waste makes it unusable for HPC applications, which are my use case. I would expect each thread to allocate memory proportionally to its share of the input set instead of allocating the whole range.

Thank you for your time!

Hi Aleix,

Let me provide additional clarifications on the discussion in this thread and bring more options for your analysis

The code

FileDataSource<CSVFeatureManager> dataSource(datasetName);
dataSource.loadDataBlock(nRowsInBlock, iThread*nRowsInBlock, (iThread+1)*nRowsInBlock)

always reads the first nRowsInBlock (maxRows parameter) rows of the file and saves them to the table at the rows starting with the index iThread*nRowsInBlock  (rowOffset parameter)

All threads get the same block of the data what is not what you want
 

Below is another option which is expected to address memory consumption and the same data in the threads issues:

// create FileDataSource once outside of the parallel region
FileDataSource<CSVFeatureManager> dataSource(datasetName);

// The code executed by each thread
{
    // Reading the local data
    NumericTablePtr localData = HomogenNumericTable<>::create();
    // You can create empty table here. loadDataBlock will automatically resize table
    // according to size of the loaded block and allocate required amount of memory

    {
        mutex_lock();
        // read the block of nRowsInBlock rows and save them into the local table
        dataSource.loadDataBlock(nRowsInBlock, localData);
        mutex_unlock();
    }
    localAlgorithms[iThread].input.set(pca::data, localData);

    // Compute PCA decomposition
    localAlgorithms[iThread].compute();
    partialResults[iThread] = localAlgorithms[iThread].getPartialResult();
}

…
// compute the final result by merging partialResults
…

Please, let us know if the approach addresses your needs in terms of memory consumption and performance.
Also, other advanced options are available, but let us discuss the simple option above first

Hi Vladislav!

Thank you for your proposal. It is for me a requirement to allow multiple threads to read the input data in parallel. Therefore, protecting the dataSource object with a mutex it is not an option for me (that's why I pretended to have a dataSource object per thread). Could you please elaborate further which other options could address this issue?

Thank you very much!

Hi Aleix,
I provide other options for parallel data source access followed by the computations and explain their specifics

Let’s consider the following simple case:
Dataset consists of 3 blocks of data: A B C which are to be read and processed by 3 threads run by the master. Each thread will associate a local DataSource with the same file and read it independently

Thread 1 will have to read and process block A
Thread 2 will have to read blocks A, B and process B
Thread 3 will have to read blocks A, B, C and process C
Eventually the master thread will combine partial results computed by 3 threads into a final result

We can’t just skip rows of the file, so each thread will need to read and parse all rows of the file until it gets to the right block of data.
This means that the thread 3 will be forced to read the whole file to get the last block of data before it can start doing some computations.
The rest threads will have to wait for this slowest thread to finish its work. Also, simultaneous reading/parsing in multiple threads can be slower due to I/O bandwidth. In this case there might not be benefit from parallel reading scheme. It would look the same to some extent as to read the whole data file and then process data blocks in parallel and merge the results.

In Intel DAAL we support another computational mode (at least for some of the algorithm) – online mode. In this mode we can do some computations on next block of data with regards to the previously obtained results. In this case it can look the following way:
Dataset consists of 3 blocks of data: A B C
Thread 1 will read block A and processes A in online mode
Thread 2 will read blocks A, B, processes B and combines it with partial result from Thread 1
Thread 3 will read blocks A, B, C, processes C in online mode and combines it with the partial result from Thread 2 and produce the final result

In the scenario we run computations in parallel together with data reading, so eventually, the last thread will process the block and just merge it with the results aggregated for all other blocks. However, you would need to implement some kind of a thread synchronization that may not work for you.

One more option is just to split the original file into different files before running the application.
In that case, each thread can create DataSource for a specific file and just read and use everything from that file.

What your perspective for those options, their specifics and applicability in your use case?

Hi Vladislav,

Thank you for your proposals. However, none of the options work for me. The first two options not only have the same memory waste problem as before but they also do unnecessary I/O or memory copy (In case the data is in the page cache). The third option would work, but the preprocessing step to adapt the data (one file per block) just adds unnecessary overhead to the overall computation. This problem looks like a consequence of DataSource class limitation to work with offsets (skip rows). If other users are also interested in this feature it could be interesting to be added in a future release.

Could you think in any other option to try?

Thank you!

 

Leave a Comment

Please sign in to add a comment. Not a member? Join today