[Acceler8 '12] Scaling fast sequential algorithms using MapReduce

Parallel algorithm vs. work in parallel

As many of the forum posts have shown, fast algorithms for solving the problem of maximal common substrings gave good results on the benchmarck but didn't really scale with the number of threads. This is because those sub-square (linear or n*logn) algorithms are hard to parallelize. Generally, when faced to such a situation one should accept the fact that the algorithm can't be parallelized and search for other approaches to make their solution scalable.
Most common one is to divide the work: apply same fast sequential algorithm multiple times to smaller parts of the input. The work can now easily be done in parallel. We applied this scheme of work division and distributed tasks across worker threads using a MapReduce framework. We didn't obtain the best times (12s on ...035) but we believe that this is mostly because of the poor implementation of the MapReduce framework (using openmp and badly tuned parameters) and not so fast sequential algorithm (suffix array built in O(N (logN)^2) and we can't judge the scalable parallelization scheme we present here based on those times.

Work in parallel complexity analysis

This division of work in the case of multiple maximal common substrings seems at first sight to be a worst idea than using the sequential O(N^2) dynamic programming approach. Let me explain this: supposing the reference sequence is of length N and there is only one input sequence of length M, and we devide the reference in A sequences of chunks of size C and the input in B sequences of same size, we will have to solve AxB subproblems, a quadratic number, each problem of size N/A + M/B, that is of a size that decreases linearly. If the solver has sub-quadratic complexity (and this should be the case for this approach) than the amount of work grows: A*B* T(N/A+M/B) > T(N+M).

But this is not really the case, because A and B (on witch the amount of work depends) don't need to be so big. It's enough to generate same number of tasks as the number of threads available. In this context, the time (not work) complexity of the approach would be T((N+M)/sqrt(P)), so we've got a scalability proportional to the square root of the number of threads (or even bigger if the complexity of the solver T is supra-linear). So yes, the amount of work grows (linearly with the number of threads) but the time needed to solve the problem is reduced by sqrt(P) times. This can get even better if we can also scale the solver a little bit and use to threads per solving one subproblem, such that we have work for all available threads but not grow the total amount of work too much by making a smaller number of subproblems).

It must be noted that the calculus applies for the worst case of comparable length N and M. In case that one is considerably bigger (let's say N), the other doesn't have to be split in chunks (B=1) so the amount of work not only doesn't grow, but it is even smaller for supra-linear solvers (O(N logN) or O(N logN^2) as our): A*T(N/A) < T(N) if T(N)!=O(N). Also, the supposition of only one input doesn't affect the result because multiple input sequences can be solved at once, using the same suffix array by concatenation (we implemented this).

MapReduce for our problem

Now that hopefully I've convinced you of the worthiness of the approach, let's see how we imagined it. The MapReduce paradigm is quite simple, you have to operations: split the input in multiple chunks, map each chunk to some key-value pairs of intermediate results, and then reduce two such pairs to a single one until you get the result for the entire input. And this is what we did. We split the reference and input sequences and created A*B pairs of sequences. We mapped each pair (using the solver) to a list of matches of length bigger than minMatchLength. We computed the solution for the initial reference and one of the input sequences as the reunion of all the mappings containing a chunk belonging to that input (reduce operation).

The only problem we encountered is, obviously, identifying matches that were split in two or more chunks. First approach would be to retrieve from the solver also matches of each suffix of length smaller than minMatchLength, because this matches may be continued in the next chunk. But this is really bad, because most of these shorter matches won't lead to a big enough match when adding the results extracted from the next chunk of input/reference. A smarter approach we identified is to increase each chunk by minMatchLen-1 both to the left and right if possible. Doing so, each match (of length >= minMatchLength) that crosses the boundary between two chunks will have a part bigger than minMatchLength in each chunk. An overlap of only minMatchLength characters is also enough. Doing this, the implementation of the reduce step was straight forward: we take two mappings A and B, each representing a pair of a reference chunk and an input chunk mapped to a list of matches of length bigger than minMatchLength. It is necessary that one chunk (reference or input) is the same for both A and B while the other type of chunk are overlapping chunks. We want to reduce these to a single mapping C representing the pair given by the common chunk and the reunion of the other two chunks mapped to the corresponding matches. It is clear that each match that is in A or B and does not cross the original (excluding the minMathcLength-1 added characters) common boundary of the overlapping chunks is also a solution in C. To solve the matches in the overlapping area basically we first identified in the chunk that is the same for both A and B, for each position, the longest match of the ending of the chunk in A that finishes there, and the longest match of the beginning of chunk in B that starts there, and then add those combinations as solutions for C. But this was done scanning the solutions in A and B that cross the boundary and using a hashtable for keeping track of longest matches for each position in the other type of chunk.

As a side note, the reduction step can also be parallelized easily, even if there are dependencies. Given N mappings than need to be reduced, one can do it sequentially one by one in O(N) time and O(N) work or in parallel in O(logN) time and double work, O("2"*N), by using a scheme as: reduce 1-2 and store in 1, reduce 3-4 and store in 3, etc; then reduce 1-3 and store in 1, reduce 5-7 and store in 5 etc. In the end, the first and single mapping will have the solution.

Implementation

To sum up, here is how our implementation of MapReduce for maximal common substrings looked like, using a kind of tree to reduce dependencies:


for (each chunk refChunk of the reference sequence) {
// for each input sequence
for (each input sequence i) {
// build the tasks for each chunk of the input sequence
for (each chunk inputChunk of the input sequence i) {
Tree[i][refChunk][inputChunk].ref_start = Chunks[0][refChunk].start;
Tree[i][refChunk][inputChunk].ref_end = Chunks[0][refChunk].end;
Tree[i][refChunk][inputChunk].input_start = Chunks[i][inputChunk].start;
Tree[i][refChunk][inputChunk].input_end = Chunks[i][inputChunk].end;
q.push_back(&Tree[i][s1][s2]);
}
}
}

solver_scalability=1;
omp_set_num_threads(num_threas/solver_scalability)
#pragma omp parallel for
for (each task in q) {
omp_set_num_threads(solver_scalability);
solve(task);
}

omp_set_num_threads(num_threads/4);
#pragma omp parallel for
for (each input sequence i) { // for each input sequence
for (each chunk refChunk of the reference sequence) {
for (logn_step = 2; logn_step<number_chunks[i]; logn_step=logn_step<<1) {
omp_set_num_threads(4);
#pragma omp parallel for
for (inputChunk=0; inputChunk<number_chunks[i]; inputChunk+=logn_step) {
reduce(&Tree[i][refChunk][inputChunk], &Tree[i][refChunk][inputChunk+step/2]);
}
}
}
}

#pragma omp parallel for
for (each input sequence i) { // for each input sequence
for (logn_step = 2; logn_step<number_chunks[0]; logn_step=logn_step<<1) {
omp_set_num_threads(4);
#pragma omp parallel for
for (refChunk=0; refChunk<number_chunks[o]; refChunk+=logn_step) {
reduce(&Tree[i][refChunk][0], &Tree[i][refChunk+step/2][0]);
}
}
}
}


for (each input sequence i) {
print_solutions(Tree[i][0][0].solutions);
}



Sure, an important point of our solution was also the solver, but this doesn't make the subject of this article. I hope you liked our approach for parallelization and will consider modelling your solutions using MapReduce in the future.

Reportez-vous à notre Notice d'optimisation pour plus d'informations sur les choix et l'optimisation des performances dans les produits logiciels Intel.