How to make this reduction in Cilk Plus?

How to make this reduction in Cilk Plus?

Hello,

I have code that is structured like this:

float A[3], X[M], Y[M], Z[M], OUTX[N], OUTY[N], OUTZ[N];

for (i = 0; i < N; i++) {
  // Use other arrays and i as an index to these arrays to initialize A[0], A[1], A[2]
  for (j = 0; j < M; j++) {
    // Calculate new values for A[0], A[1], A[2]
    // using more arrays where i and/or j are used as indexes

    X[j] += A[0];
    Y[j] += A[1];
    Z[j] += A[2];
  }
  OUTX[i] = A[0];
  OUTY[i] = A[1];
  OUTZ[i] = A[2];
}

I have successfully parallelized the outer loop using OpenMP, making the array A private and adding the atomic directive before the updates to the elements of X, Y and Z (using critical was actually worse). But now I would like to try this code out using Cilk Plus.

Although I have read all the documentation about reducers and reduction operations in Cilk Plus, I still cannot formulate in my mind how the above code could be implemented in Cilk Plus. I would like to replace the outer loop with a cilk_for and have some way to make the reductions for the elements in arrays X, Y and Z. Could anyone direct me towards the right solution?

Thank you,

Ioannis E. Venetis

11 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

I don't see a motivation for making a reduction of this, but your comments don't agree with your C-ish code.  

Apparently, you could write Cilk(tm) Plus array notation

 X[0:M] += M*A[0];

...

OUTX[0:N] = A[0];

...

and expect Intel C++ compiler (but not gcc cilkplus) to optimize by fusion.

If you meant sum reductions of those arrays, you might have something like

OUTX[0:N] = __sec_reduce_add(X[0:M]);

...

if you accepted the performance penalty.

For a while (before the name was changed from CEAN), Intel C++ sometimes required Intel(r) Cilk(tm) Plus Array Notation for optimization, while Cilk(tm) Plus reducers tended to hinder optimization (and, in my limited view, don't improve readability).  Now, in the absence of a performance boost, you would want to find some value in expressiveness or maintainability for the use of Cilk(tm) Plus.

 

Dear Tim,

Thank you for taking the time to look into this. I am afraid, however, that I don't understand why my comments don't agree with the pseudocode I have sent. Maybe it was not clear that the values in A[0], A[1] and A[2] change during each repetition of the inner loop. So, it is not possible to simply do X[0:M] += M*A[0]; I have to add A[0] after each repetition of the inner loop to the corresponding position in X (the same holds for Y and Z). As far as I can tell, this is a reduction on a specific position in an array. But the documentation I have seen for Cilk Plus only provides examples to do this kind of operation on simple variables, not positions in arrays.

I hope it is now more clear where I am stuck.

Best regards,

Ioannis E. Venetis

The first thing you're going to have to do is to move the declaration of A into the outer loop like this:

float X[M], Y[M], Z[M], OUTX[N], OUTY[N], OUTZ[N];
 
cilk_for (i = 0; i < N; i++) {
  float A[3];
  // Use other arrays and i as an index to these arrays to initialize A[0], A[1], A[2]
  for (j = 0; j < M; j++) {
    // Calculate new values for A[0], A[1], A[2]
    // using more arrays where i and/or j are used as indexes
 
    X[j] += A[0];
    Y[j] += A[1];
    Z[j] += A[2];
  }
  OUTX[i] = A[0];
  OUTY[i] = A[1];
  OUTZ[i] = A[2];
}

This gives each strand a private copy of A to work with so you're not racing.

    - Barry

Dear Barry,

Indeed, A has to move into the first loop. That's quite straightforward, as it corresponds to using the private directive for A in OpenMP (which I have already done, as I have mentioned).

Unfortunately, the difficult part (at least for me) still remains. More strands can access at the same time position X[j] and I need some way to synchronize at that point. Any idea on this part would be highly appreciated.

Ioannis E. Venetis

An array reducer is one solution.  Conceptually, each Cilk worker needs its own view.  The code below defines type WideAddend to be suitable as a view.  Here the default constructor serves as the identity element and method reducer serves as the reduction operation, which is the convention expected by cilk::monoid_with_a_view.   More details can be found in the extensive comments in header <cilk/reducer.h>.

- Arch

#include <cassert>
#include <cstring>
#include <cilk/cilk.h>
#include <cilk/reducer_opadd.h>

template<typename T, int M>
struct WideAddend {
    T X[M], Y[M], Z[M];
    WideAddend() {
        X[0:M] = 0;
        Y[0:M] = 0;
        Z[0:M] = 0;
    }
    typedef WideAddend value_type;
    void reduce( const WideAddend* rhs ) {
        X[0:M] += rhs->X[0:M];
        Y[0:M] += rhs->Y[0:M];
        Z[0:M] += rhs->Z[0:M];
    }
};

int main() {
    const int N = 1000;
    const int M = 100000;
    cilk::reducer<cilk::monoid_with_view<WideAddend<float,M> > > a;
    cilk_for( int i=0; i<N; ++i )
        for( int j=0; j<M; ++j ) {
            a.view().X[j] += 1;
            a.view().Y[j] += 1;
            a.view().Z[j] += 1;
        }
    for( int j=0; j<M; ++j ) {
        assert( a.view().X[j]==N );
        assert( a.view().Y[j]==N );
        assert( a.view().Z[j]==N );
    }
}

 

Arch's solution is a special case of what we often call a (dense) array reducer.  One can construct a generic array reducer along the lines of what Arch described, and re-use it in other, similar circumstances.  An array reducer would be used like this:

template <typename Monoid, std::size_t SZ>
class array_reducer { ... };

array_reducer<op_add<float>, M>    X, Y, Z;
...
    cilk_for (int i = 0; i < N; ++i) {
        float A[3] = { ... };
        for (int j = 0; j < M; ++j) {
            X[j] += A[0];
            Y[j] += A[1];
            Z[j] += A[2];
        }
        OUTX[i] = A[0];
        OUTY[i] = A[1];
        OUTZ[i] = A[2];
    }

If anybody wants to contribute such an array reducer to the cilklib library, we'd be happy to consider it.

However, the overhead of the reductions has to be balanced against the amount of work done in each iteration.  On average, you should expect about log2(P) reductions, where P is the number of workers (cores) in your system.  If M is large, each reduction will be expensive because each reduction involves traversing an entire array of M elements.  Dense array reducers are best used for small values of M and large values of N.

(Aside: Note that I needed to privatize j, to avoid a race.  The original code did not show this privatization.)

You could parallelize your code without reducers the same way that OpenMP does by using atomic operations. Instead of OpenMP atomics, you would use C11 or C++11 atomic operations.  However, this approach has several problems, not the least of which is likely to be poor performance in both OpenMP and Cilk Plus.  A better approach might be to avoid the determinacy race entirely.  The indeterminate operations are the increments of X[j], Y[j], and Z[j] because multiple iterations of the outer loop could modify the items at the same j at the same time.  One way to avoid that problem would be parallelize the inner loop instead of the outer loop. Depending on the values of M and N, you might not lose too much parallelism by doing that, and it would eliminate the need for atomic operations or reducers. Another approach would be to invert the loops -- i.e. put the j loop on the outside and parallelize that instead of parallelizing the i loop.  Based on the skeletal code that you provided, I don't know how feasible the latter approach is.  If I were to take your code at face value, I might parallelize it by creating a separate loop to compute OUTX, OUTY, and OUTZ before computing X, Y, and Z, in order to avoid redundant computations (although that might not actually be helpful):

float X[M], Y[M], Z[M], OUTX[N], OUTY[N], OUTZ[N];

// Pre-compute OUTX, OUTY, and OUTZ
cilk_for (size_t i = 0; i < N; ++i) {
    float A[3] = { ... };
    OUTX[i] = A[0];
    OUTY[i] = A[1];
    OUTZ[i] = A[2];
}

// Now compute X, Y, and Z, inverting the original loops
cilk_for (size_t j = 0; j < M; ++j) {
    #pragma simd
    for (size_t i = 0; i < N; ++i) {
        float A[3] = { OUTX[i], OUTY[i], OUTZ[i] };

        X[j] += A[0];
        Y[j] += A[1];
        Z[j] += A[2];
    }
}

The inner (simd) loop could also be replaced by:

    X[j] = __sec_reduce_add(OUTX[0:N]);
    Y[j] = __sec_reduce_add(OUTY[0:N]);
    Z[j] = __sec_reduce_add(OUTZ[0:N]);

The above code will not scale well if M is small (less than 10*P), in which case the array reducer might work better. There is one more solution, which involves a more complicated transformation: replace the nested loops with a recursive traversal of the 2-dimensional MxN space in such a way that, for each j, no parallel operations try to modify X[j] at the same time. Start by dividing the entire traversal into four "quadrants" as follows:

    i-> 0          N/2          N
  j   0 +-----------+-----------+
  |     |           |           |
  V     |     A     |     B     |
        |           |           |
    M/2 +-----------+-----------+
        |           |           |
        |     C     |     D     |
        |           |           |
      M +-----------+-----------+

Note that none of the points in quadrant A share either i or j coordinates with any of the points in quandrant D. This means that a modification to X[j] when i and j are in quadrant A will never conflict with a modification to X[j] when i and j are in quadrant D. Similarly, quadrants B and C have disjoint sets of i and j coordinates. We can take advantage of this quality to compute quadrants A and D in parallel, then (when A and D are complete) compute quadrants B and C in parallel. This gives us a parallelism of only 2, but we can recursively decompose quadrant A into four sub-quadrants. We can continue recursing until we have a small rectangle that we want to compute serially (with vector operations). With this approach, not only do you achieve race-free parallelism, but you also achieve good cache locality. This kind of algorithm is called "cache oblivious" because, for any reasonably-sized base case (say 16x16), the computation will use cache almost optimally without the algorithm's knowing the target machine's cache size.

The full code for the cache-oblivious divide-and conquer algorithm would be too lengthy to show here (especially since I don't have real code to start with). There are a number of corner cases to watch out for, such as handling round-off when M and N are not powers of two and the fact that, unless M == N, one of the dimentions will be divided down to 1 before the other. Here is some pseudo-code, however:

float X[M], Y[M], Z[M], OUTX[N], OUTY[N], OUTZ[N];

// Pre-compute OUTX, OUTY, and OUTZ
cilk_for (size_t i = 0; i < N; ++i) {
    float A[3] = { ... };
    OUTX[i] = A[0];
    OUTY[i] = A[1];
    OUTZ[i] = A[2];
}

void compute(size_t i, size_t j, size_t num_i, size_t num_j)
{
    if (num_i and/or num_j are small) {
        size_t end_i = i + num_i;
        size_t end_j = i + num_j;
        for (size_t j = 0; j < end_j; ++j) {
            #pragma simd
            for (size_t i = 0; i < end_i; ++i) {
                float A[3] = { OUTX[i], OUTY[i], OUTZ[i] };

                X[j] += A[0];
                Y[j] += A[1];
                Z[j] += A[2];
            }
        }
        return;
    }

    // Note: The following is psuedo-code.  We Cannot just use num_i / 2
    // or num_j / 2 without accounting for round-off:

    // compute quadrants A and D in parallel
    cilk_spawn compute(i, j, num_i / 2, num_j / 2);              // A
    compute(i + num_i / 2, j + num_j / 2, num_i / 2, num_j / 2); // D
    cilk_sync;

    // compute quadrants B and C in parallel
    cilk_spawn compute(i + num_i / 2, j, num_i / 2, num_j / 2); // B
    compute(i, j + num_j / 2, num_i / 2, num_j / 2);            // C
    cilk_sync;
}

A variation of the above is to divide the SMALLER of num_i and num_j in half, leaving the other one alone.  But be careful: when dividing num_i in half, the two halves must be run SERIALLY, to avoid parallel operations racing on the same value of j.

I hope this has helped.

- Pablo

Dear Arch and Pablo,

Thank you so much for your help and detailed descriptions. I think that I have a solid base now from where I can start and study the problem further.

In the meantime I also thought about using the C++11 atomic operations. I will try this solution first, as it should be the easiest to implement. On the other hand, using a dense array reducer might prove more expensive in the end. N is around one million and M around 200000. Judging from your comments maybe it is not a good idea. But only trying it out will tell us.

Finally, exchanging the order of the i and j loops is most probably the worst scenario. It would require a major rewrite of the rest of the code, which is not something that I am willing to do right now. The order of the loops was actually chosen so as to make things easier to parallelize. I should have mentioned that, as it would help you to give a more specific answer (and would also save you a lot of time to think and write about this possibility).

Thank you again. I will look into this and if anything else arises I will come back.

Ioannis E. Venetis

I very strongly advise against using facilities from two different parallel mechanisms in the same program (in this case, CilkPlus and C++11 threading).  They may work together or they may conflict - and, if they conflict, they could produce the most horrible misbehaviour.  Equally badly, how they interact is quite likely to be very different between different implementations.  Also, as with much of shared-memory parallelism, a simple test will often seem to work but your real code will fail, sometimes.

The exception is, of course, when one mechanism clearly and explicitly states that it can be used together with the other, as is the case with the CilkPlus and simd mechanisms in the above.

Someone who knows more may say that CilkPlus and C++11 threading work together but, until they do, I would not mix them.

Oops.  I notice that Pablo, who knows a lot more about this than I do, has said that they work together.  Pablo, could you possibly confirm whether this is a required part of CilkPlus, or something that is true in the current implementations, including gcc?

I assume using CilkPlus and explicit C++11 thread management is still a bad idea, because it would be thoroughly confusing at best, and lead to conflict at worst.  Of course, I am talking about C++11 and that might change if CilkPlus gets merged into the C++ standard.

As you know, I have the writing of a CilkPlus course on a back burner, and my practice is to teach only 'bulletproof' programming techniques, where there is any possibility of doing so.

Assuming your reduction is commutative (or in the case of floating-point addition, close enough that you aren't worried about the difference), you may be able to use a commutative reducer array.

I just added an initial version of commutative reducer array into Cilkpub, a library of contributed code that is available on the Cilk Plus community website.
https://www.cilkplus.org/sites/default/files/contributions/cilkpub_v104.tar.gz
See "samples/creducer_opadd_array.cpp" for an example program, and "include/creducer_opadd_array.h" for the header file.

For the example you had above, I believe the code would look something like the following.
 

#include <cilkpub/creducer_opadd_array.h>

float X[M], Y[M], Z[M], OUTX[N], OUTY[N], OUTZ[N];

// Create reducers arrays that are based on the arrays above.
cilkpub::creducer_array<float> cred_X(X, M);
cilkpub::creducer_array<float> cred_Y(Y, M);
cilkpub::creducer_array<float> cred_Z(Z, M);

cilk_for (i = 0; i < N; i++) {
    float A[3];
    // Use other arrays and i as an index to these arrays to initialize A[0], A[1], A[2]
    for (j = 0; j < M; j++) {
        // Calculate new values for A[0], A[1], A[2]
        // using more arrays where i and/or j are used as indexes
        cred_X[j] += A[0];
        cred_Y[j] += A[1];
        cred_Z[j] += A[2];
    }

    OUTX[i] = A[0];
    OUTY[i] = A[1];
    OUTZ[i] = A[2];
}

// Move the final data out into the output array.
cred_X.move_out(X);
cred_Y.move_out(Y);
cred_Z.move_out(Z);

 

Roughly speaking, a commutative reducer array creates a local copy of the array for each possible worker thread.   The "move_out" command will trigger a merge of all the local copies back into the original output array.   This approach will generally use less space than an array of normal reducers for modest to large size arrays, but for P cores, it still requires P times as much space as the serial code or a version that uses C++ atomics.   Having local copies of the array for each worker may avoid contention on atomic operations and ping-ponging of cache lines however.

I am not an expert on C++ atomics, but I don't know of any reason why they should be incompatible with Cilk Plus.

Finally, to add the appropriate caveats, this commutative reducer array is *not* a reducer that is supported product.  
But I would be interested to know whether it works well for your problem.

Cheers,

Jim
 

Leave a Comment

Please sign in to add a comment. Not a member? Join today