Branching pipelines

Branching pipelines

I'm just getting started with TBB, and am having problems trying to model something in this framework. Suppose I have an input sequence of data items consisting of a mixture of fruit, e.g. apples, pears and oranges. The processing I need to do on each of these fruit depends on the kind of fruit. Furthermore, I'd like to batch up the fruit into "boxes", each containing just one kind of fruit, so that I only process a box at a time. Without TBB I'd write a thread that read in the fruit one at a time, sorted them into buckets based on their type, and parceled them up into boxes. I'd then write separate threads for each fruit type, and use a producer/consumer model to pass boxes of apples to the apple thread, and similarly for all the other fruit. If these threads produced any results, e.g. how much we sold each box for, then I'd merge these results back into a stream for subsequent processing. Now my first thoughts were to model all this using TBB pipelines. But I couldn't see anything equivalent to a user-controlled multiway branch feeding separate pipeline stages, and a merge back from these independent stages to a single stream. Is there some more elegant way of modelling this in TBB, or some feature of the framework I'm overlooking?

Thanks.

10 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

Let me first ask you about the nature of your input sequence. Is it an array of items, a forward iterable sequence, or the items are retrieved or arrive one at a time from somewhere outside as soon as they become available?

The items arrive one at a time from an external source, e.g. packets of work read from a network. So the input sequence is unbounded in length.

Well, both TBB algorithms and TBB lower level tasking model have been developed with CPU-intensive applications im mind, while your scenario contains a stage that represents a typical blocking style use case. But this does not mean that there is nothing you can do with TBB here.

Actually I believe that you idea about using pipeline is not that hopeless. You could use serial input filter co read packets and parcel boxes. As soon as a box is ready you mark it with the type of fruit in the given parcel and pass it to the second filter which is parallel. The label I've just mentioned could be a pointer to a member fuction of the second filter that processes the specific type of fruit. The last filter will be again a serial one. If the seconf filter does not produce anything, it just passes NULL to the last one, which is ignored. What do you think?

The only problem with the this approach may be caused by the fact that the input filter can block waiting for the next packet. Since it is executed by a thread from the TBB pool, it will temporarily exclude this thread from doing useful work (e.g helping with processing already parcelled boxes). In other words the system will be undersubscribed.

If your packets arrive frequently, and your system has 8 or more cores, you probably could ignore the underutilization.
Otherwise you may initialize TBB scheduler with one extra thread:

tbb::task_scheduler_init init(tbb::task_scheduler_init::default_num_threads() + 1);

This leads to temporary oversubscription, but as long as overutilization periods are rare enough the overall performance will be increased.

Probably we could work out some other design as well. Let me ask a few more questions to have a better picture of your requirements:

  • Is one box processing provides enough work to be worth parallelizing (and is it possible to parallelize it at all)?
  • How many boxes per second (approximately of course) are generated in a typical case?
  • Do you need to ensure that the results of boxes processing arrive at the final stage in the same order as boxes were parcelled?

Actually I believe that you idea about using pipeline is not that hopeless. You could use serial input filter to read packets and parcel boxes. As soon as a box is ready you mark it with the type of fruit in the given parcel and pass it to the second filter which is parallel. The label I've just mentioned could be a pointer to a member function of the second filter that processes the specific type of fruit. The last filter will be again a serial one. If the second filter does not produce anything, it just passes NULL to the last one, which is ignored. What do you think?

Id thought about such an approach, but it looked like the parallel second stage would need to be constrained a bit. In my actual problem there are many different kinds of fruit, potentially hundreds of them, and each of them can be processed independently. So the parallelism is at the level of being able to process a box of apples at the same time as a box of oranges. The processing of each box is largely sequential in nature, or at least at present the small amount of parallelism available is not worth exploiting. I definitely need to process each apple in a box in the same order it originally arrived. The processing step on each fruit is configurable though, so I suppose at some point I could find there is a small amount of parallelism that is exploitable at this level. Each box of apples must be processed in the order the apple boxes were produced, and the same for each of the other fruits. So if I simply classify fruit into boxes in the first stage and then pass them to the second filter then I may end up processing multiple boxes of apples in parallel. I cant simply set the number of tokens to be the number of different kinds of fruit, as that still doesnt prevent the problem. But if it sounds like the pipeline approach is the best way to go in TBB then presumably Ill need to figure out how to implement something like the token mechanism, but at a finer level to ensure that if I pass an apple box into the second filter I cant pass in another apple box until the first one has come out the other end. If the number of fruit types becomes too large I can always aggregate them, e.g. introducing an AppleOrOrange hybrid, but at the cost of constraining the potential parallelism.

The only problem with the this approach may be caused by the fact that the input filter can block waiting for the next packet. Since it is executed by a thread from the TBB pool, it will temporarily exclude this thread from doing useful work (e.g helping with processing already parcelled boxes). In other words the system will be undersubscribed.

Yes, that was a concern, but oversubscribing when the number of cores is small sounds like a good plan. Although my current machine has 2*4 cores, I was worried about what would happen with a dual core machine with one task blocking frequently. Ill try out your suggestion.

Is one box processing provides enough work to be worth parallelizing (and is it possible to parallelize it at all)?

Probably not. The purpose of introducing fruit and the boxing mechanism, was to separate out useful work that can be done in parallel, so two boxes of the same fruit must be processed in the order they arrived (and fruit within a box). And the box size is chosen so that the amount of computation on each box is large enough that the cost of interacting with the rest of the world to get the next box is negligible.

How many boxes per second (approximately of course) are generated in a typical case?

Tens or hundred of thousands of fruit can arrive per second. The boxes per second rate can vary as it depends on how big each box is. For my non-TBB example, I had boxes with 100 fruit in each one. Less than this and the synchronization time required to get each box started to become noticeable compared to the time required to process each box. But the time to process each fruit may vary, depending on what I need to do in each scenario Im considering. So Im expecting to have to adjust the box size for different scenarios.

Do you need to ensure that the results of boxes processing arrive at the final stage in the same order as boxes were parcelled?

Not for different fruit. But for the same fruit the order of the output should match the corresponding input order. This presumably makes it difficult to use a serial filter, as that would impose an unnecessarily strict order on the output. Well, Im not entirely sure how a serial filter connected downstream of a parallel one actually works; from the descriptions in the documents it sounds like it does more than just process the items one at a time in the order they were generated from the parallel filter. But Im guessing a serial filter isnt quite right if it does more than process the outputs from the parallel filter in the order they are produced. Thats why originally Id been thinking of a dispatch mechanism to a collection of fruit-specific pipelines, as it seemed easier to visualize the various ordering constraints.

Thanks for the details. Your case indeed presents a nontrivial problem. But the fact that you have many different kinds of fruit gives a chance that the following approach could probably work out. First I outline the overall design, and then we consider a few suggestions regarding its implementation.

1. We use the pipeline with three serial filters, and the number of tokens is three.

[To the note in your letter - serial filters in the pipeline are ordered. The ordering is established by the fist serial filter, and all the subsequent serial filters (even if they are separated by parallel stages) process tokens in the same order. Thus if your input filter is serial, all other serial filters will heed the order established by it]

2. The input filter reads packets and parcels them into boxes in a loop.

3. As soon as a box is filled, it is not sent to the processing stage immediately, but rather is inserted into the vector of lists of boxes. Each element of the vector is a list of boxes of a particular fruit kind. A list of boxes of the given fruit kind occurs only once in the given vector instance. This means that when the next box is inserted, the vector has to be searched first (we'll talk about the details later).

4. Once the vector reaches some reasonably big size (which is sufficient for efficient parallelization) it is sent to the processing stage that is executed in parallel with the retriving stage. This means that there is some latency period after the retrieval loop starts, but as soon as the first processing is launched retrieval will happen in parallel with processing.

5. The processing stage allocates a vector for processing results of the same size as input vector of lists of boxes, and then it runs tbb::parallel_for.

6. The body of parallel_for serially processes the range of input vector elements (that is the range of lists of fruit boxes), and accumulates the processing results in the output vector. This ensures that the boxes of fruit of the same kind are processed serially and in order. The fact that there are many different kinds of fruit, and consequently each vector contains many lists, allows to achieve good load balance on this stage even if the distribution of fruit by kinds is substantially non-uniform (like 20 oranges per 10 apples per 2 kiwi per 1 pear ...)

7. After the parallel_for finishes, the results of the processing stage are sent to the post processing stage.

Now some implementation details. Likely you can easily figure all this out on your own, but for the benefit of less experienced readers I may be annoyingly verbose. I also list several options sometimes, but which one is the most suitable for you will depend on your particular data and probably will require experimenting.

The best way to implement boxes seems to introduce an IFruitBox interface that exposes a fruit box processing method. This is the only method that is necessary at the second stage. This method should accept a processing result structure as its out argument. The implementation classes (CAppleBox, CGrapeBox) will implement IFruitBox intefaces, and will contain (or inherit from) a container for individual fruit instances. If the individual instances of fruit are represented by data structures of different type, you may abstract them too though it is not necessary (and may ).

Vector of lists of boxes would look like std::vector< std::list >. The tricky part is to find a criterion for its optimal filling. I can see three main options. The input filter keeps adding new boxes until:

  • the vector contains FRUIT_KIND_LIMIT elements (looks like the optimal one but may not work if the distribution by fruit kind is very nonuniform, or significantly varies in time)
  • the vector contains BOX_LIMIT boxes of any fruit kind
  • the vector contains both BOX_LIMIT boxes and FRUIT_KIND_LIMIT_2 elements

(FRUIT_KIND_LIMIT_2 should be smaller than FRUIT_KIND_LIMIT in the option 1 in order to cope with nonuniformity of the distribution by fruit kinds).

Each of these criteria can be also combined (in a whichever-happens-earlier way) with

  • the processing stage signals that it's done

At last additional load balancing can be provided for by putting off some of the boxes until the next round based on the expected or historical distribution. But this is definitely an option for the version 2 of the product :)

As I noted in the step 3 above insertion into the vector require search. Thus interface IFruitBox should also have a comparison method. And here we have 2 options:

  • vector is sorted by fruit type (IFruitBox needs "less" method)
  • vector is unsorted (IFruitBox needs "equals" method)

Probably unsorted vector should work fine for you, though you still may want to experiment.

Hope something of what I wrote will be helpful for you.

Thanks for the great response Andrey . It's given me alotto think about.I think I need to spend a bit of time digesting all the details.I'll let you know how I get on.

Kevin

> Each of these criteria can be also combined (in a whichever-happens-earlier way) with the processing stage signals that it's done

So I grasp the general strategy you have outlined, but there's one aspect that is still puzzling me. We start with the input filter's operator() method being called, which continues reading packets off a link, and building boxes of fruit, until the vector satisfies some "size" criteria. This vector is then returned as the result of the method, and the processing stage can then start processing the fruit, hopefully doing a lot of this work in parallel. Given that the number of tokens is three then the input filter's operator() method will immediately be called again, from another task, to start accumulating the next batch of fruit. But what you are suggesting, which makes sense intuitively, is that when the processing stage finishes then the input filter should consider finishing the production of the vector of fruit boxes prematurely, even if it isn't big enough to satisfy our criteria, to avoid the processing stage being stalled. But how does the code inside the input filter's operator() method know that the processing stage has finished? Or have I misunderstood your suggestion? As with all this stuff, I can see how I could implement all this manually, but is there something in TBB that assists here? Maybe I've still not fully grasped how TBB's version of pipelining works.

Hi, Kevin.

You seem to have understood everything just perfectly, even the thing I failed to convey explicitly enough - that it is difficult to exactly predict beforehand, which of the possible design variants will give the best result.

So, about that additional condition. TBB itself does not provide a way to learn when any other stage in the pipeline completes. As I mentioned before TBB's primary goal is to handle situations when there is a plenty of work to do, and in our case the work can flow in with different speed.

But we could do this easy enough by passing a pointer to a flag from input filter to the processing filter as part of the token. Looks like you've already thought about it, but anyway. The corresponding data structure could look like this:

struct TokenToProcess {
std::vector > m_input_data;
std::vector m_result_data;
bool *m_completion_flag;
}

m_completion_flag would point to a bool member of the input filter (using for this purpose a member of a filter calss is possible since our input and processing filters are serial). Input filter could check this flag from time to time (not too frequently). Doing the check upon each box completion seems to be reasonable choice.

Now a bit more about load balance. I believe that preventing the the processing stage from starvation by feeding it with data of even suboptimal size (let's call in variant 1) will be benefitial, because if we instead wait until the input vector becomes large enough (variant 2), the processing time will be not less that the processing time of the leaner one.

Thus, if the density of inbound stream of packets is constantly low, both variants will give us the same throughput - all the work will be done, and only the profile of work distribution will be different (more or less uniform undersubscription for variant 1, and serial periods alternating with full utilization for variant 2).

However if the density of inbound stream of packets changes from low to very high then back and so on, variant 1 should provide better average utilization.

Actually I've just realized (thanks to Alexey Kukanov for pointing it out) that we should use the completion flag not only to prematurely break the boxes collection, but also, just the reverse, to continue parcelling boxes until the currently pending processing stage is finished.

Well, now that I got to this point, it looks like we do not need any size criteria at all. As long as the processing stage is idle (as our completion flag indicates), the input stage sends it whatever it has at the moment (well, if it has nothing, then it should wait until the first box will be packaged - maybe the size of the box could be made less too?). This way the whole system should self-balance itself depending on the inbound work flow intensity.

If you manage to build anything useful out of this, it would be great if you shared with us how well it worked out , and of course if you have more questions (or suggestions for TBB improvement), you are always welcome.

That sounds like it might work well. Initially the system would construct fairly small vectors of fruit boxes, and the processing filter would quickly process these. So there wouldn't be much scope for parallelism, but we wouldn't need it either. But if some of the processing became more intensive, because different fruit may require different amounts of work, then the vectors would have time to grow in size, and so the amount of potential for parallel processing within the processing stage would also grow. So with a bit of luck the system might be quite adaptive, and there's probably no need for a lower bound on vector size. In this approach I may not even need the concept of boxes of fruit, I can just store lists of individual fruit in the vector. Originally I'd introducedboxes to make sure the time associated with the communication overheads of passing fruit from one stage to the next didn't form a noticeable percentage of the overall processing time. But it sounds like the pipeline overheads in TBB are appreciably smaller than in a traditional thread-based pipeline. And even if I did need some kind of minimum size to justify the overheads of passing a vector to the processing filter, a simple constraint on the overall number of fruit, rather than fruit boxes, would now seem sufficient.

For generality there's possibly still a need for an upper bound on the vector size for the case where the data is being read from a disk, rather than a network, to prevent the input filter getting carried away and constructing vector elements at a faster rate than the processing filter can handle them.

Thanks for all your help.

Kevin

Leave a Comment

Please sign in to add a comment. Not a member? Join today