DAAL Spark distributed examples

DAAL Spark distributed examples

when going through your examples for DAAL with Spark, I noticed that you only provide samples for local RDD structures, namely

  • reading from a textfile stored locally, and not in an HDFS system etc.
  • processing these into Vectors.dense, which is again a local file structure.
  • reading in a full matrix (opposed to sparse matrix representation like COO format)

Especially the last part is giving me a headache currently: When I do provide a file stored in the format (row_index, column_index, value),
I can still cast this (via CoordinateMatrix) to a RowMatrix, although it is internally stored differently;<br/>
Where your examples have a dense vector per row, the casting via CoordinateMatrix still maintains the sparse representation format for each row (i.e. two lists of the same length containing index and value, respectively).

This results in the error message for the SampleSVD example:

WARN  TaskSetManager:66 - Lost task 0.0 in stage 4.0 (TID 8,, executor 2): java.lang.Exception: Number of rows in numeric table is incorrect
Argument name: data

	at com.intel.daal.algorithms.AnalysisOnline.cCompute(Native Method)
	at com.intel.daal.algorithms.AnalysisOnline.compute(Unknown Source)
	at com.intel.daal.algorithms.svd.Online.compute(Unknown Source)
	at com.intel.daal.algorithms.svd.DistributedStep1Local.compute(Unknown Source)
	at SVD$$anonfun$3.apply(SVD.scala:126)
	at SVD$$anonfun$3.apply(SVD.scala:111)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
9 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

Hi Dennis,

DAAL implementation of SVD doesn't support input matrices in sparse format at this moment.

To use SVD you need to implement proper conversion from sparse format (CoordinateMatrix) to dense, because simple spark cast from CoordinateMatrix to RowMatrix just turns your data into a long table of size (number of entries) x 3, while you need the table of size (number of rows) x (number of columns).

Is the described approach is suitable for you?

I have successfully implemented an approach that does exactly that; it seems that it is possible to internally cast it such that it does not require the local aggregation of a Vector RDD (I can share a code sample tomorrow).

Yet, despite the fact that the internal representation is now exactly the same as from the given code example, I am still running into an issue which tells me that the respective dimensions do not match (for any document not of the size 4000x18). Reducing my own sample (with different values, and from an originally COO-stored format) to a 4000x18-dimensional matrix works; it thus seems that somewhere, the expected dimensions seem to be fixed in the code.

Hi Dennis,

We will analyze the issue with the sizes of the result RDD. 

Hi Dennis,

I attached the updated version of SVD sample.
RDD should now have the proper size - few partitions with total 4000 rows.
Other fields of result should now contain actual values instead of zeroes. 


Downloadapplication/zip sample_svd.zip3.36 KB

Thanks so much for the continuous updates!

The good news first: The results on the provided 4000x18 SVD.txt now finally (roughly) match the results I obtain from sklearn's implementation of PCA on this, and I verified that the results match what I expected them to be.

Sadly, the algorithm still fails for any other matrix dimension. I am not sure whether I have to specify these matrix dimensions somewhere, but from my understanding it should be automatically detected, right? The error is still the same;

java.lang.Exception: Number of rows in numeric table is incorrect


Hi Dennis,

Could you provide additional details about the matrix dimensions you use?
The internal structure of input RDD also matters, like the number of partitions and its sizes.

In this case I was testing a 100x100 matrix, just to see whether it would work.
I wasn't doing any special pre-processing in terms of partitioning, but just using the sc.textFile() on a different file.
If it really was about the partitioning, I'm wondering why the provided source also does not do anything like that?

Do you happen to have any documentation on what exactly I do have to respect when calling the function?

As described in the SVD distributed processing section of the documentation (https://software.intel.com/en-us/daal-programming-guide-distributed-proc...), each separate block of data should have number of rows >= number of columns.

In your case it means that, each partition of the input RDD should contain sufficient number of rows (>= number of columns).

Leave a Comment

Please sign in to add a comment. Not a member? Join today