TBB design patterns/architecture question

TBB design patterns/architecture question

Sorry for the long post but I hope some of you may find this unusual application of TBB interesting.

I am trying to build an event processing framework using TBB. Based on helpful comments received in another thread, I have a few questions about the right design for this system and in fact, if TBB is indeed the right solution for this problem. I appreciate your suggestions.

The system consists of a graph of between 10-50 Event Processors (EP). Each EP receives an event, updates its state and optionally emits another event. An event is a C++ struct (no methods) and consists of a vector of n doubles where n is not constant. An EP can subscribe to one or more EPs and can publish its results to one or more subscribers. The only exceptions are Initial EPs (which don't subscribe to anything e.g. random number generators) and Terminal EPs (which don't publish anything e.g. write results to a text file). The system can have more than one initial/terminal EPs. We can loosely group EPs into Stages e.g. Initial, intermediate1, intermediate2..., terminal.

We are looking at processing millions of events. The number of events generated at each successive stage is progressively lower (since we "reduce" at each EP e.g. sample every 10th event and emit it). To summarize: EPs are:
1. stateful/non-reentrant.
2. short-running and fast.
3. implemented as functors.
4. block often (since they cannot do anything until the next event is in).

To make life more complex, some EPs are synchronous i.e. they cannot process the next incoming event until ALL successive EPs process the last event emitted. Others are asynchronous i.e. they process an event soon as it comes in, emit it to all its subscribers (who enqueue them until they are ready) and wait for the next event. We do not know if an EP is synchronous or not until the program starts running. An EP can have a combination of sync and async EP subscribers. Async EPs need to handle overflows but let's assume we truncate the queue on overflow.

Most important of all, EPs can (and should, for performance reasons) run in parallel - if an EP publishes to three subscribers, all of them can run simultaneously. But there are situations where all of them need to be notified in sequence and as you guessed, this is not known until runtime. The silver lining is this is decided at startup and doesn't change during execution.

The initial solution looked like this and had horrible performance:
1. Each EP was implemented as an object with a TBB queue that blocks on a pop.
2. Start all non-initial EPs - all of which block waiting for an input event - using Parallel-for.
3. Start all initial EPs in a Parallel-for.
4. Each EP pops the concurrent (input event) queue in an infinite loop, processes it, dynamically creates an output event and notifies the subscribers i.e. enqueues the event pointer to each subscriber's queue. The loop ends when a special END event is sent.
5. For both sync and async processing, the EP waits on an ack queue. If the subscriber is sync, it won't push the ack until processing is complete. Async subscribers push an ack immediately before continuing with processing the event.

I understand that parallel-for is a bad idea. An alternative I can think of is to have a threadpool for each Stage. Because we create millions of events dynamically, I believe I should use the scalable allocator.

What we have is workflow parallelism, not HPC-style parallel-matrix manipulation. My questions are:
1. Is TBB the right approach for this system?
2. If yes, what can and should be done to improve the performance?

24 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

This problem seems loosely related to something I've done. You can download my paper from here: http://www.yetisim.org/images/2/2a/YetiSimPaper-SCS2008.pdf

If you feel the problem is similar, then I might be able to help you.

I probably missed something, but why do the EPs block? Can't they just skip a turn if no event is pending?

"I understand that parallel-for is a bad idea." I don't?

Quoting - tbbnovice

Each EP receives an event, updates its state and optionally emits another event.

How many time this processing takes (preferably, in cycles)? I.e. pure processing time of single event?

Quoting - Dmitriy V'jukov

How many time this processing takes (preferably, in cycles)? I.e. pure processing time of single event?

Between 50 and 500 floating point operations to process a single event. Each EP is unique.

Quoting - Raf Schietekat

I probably missed something, but why do the EPs block? Can't they just skip a turn if no event is pending?

"I understand that parallel-for is a bad idea." I don't?

From Dmitriy's post in another thread:

parallel_for is really not the best way to spawn long-running constantly blocking tasks.

In this system, the EPs stay alive until all events are processed, but each the time to process each event is quite short. So they are long-running/constantly blocking and execute for a short time on each invocation.

Quoting - Raf Schietekat

I probably missed something, but why do the EPs block? Can't they just skip a turn if no event is pending?

Let me try to clarify this. Each EP communicates with the next EP(s) using concurrent queues. A simple example is an EP has two subscriber EPs. The first one is runs really fast so it finishes processing but the publisher EP cannot send the next event until BOTH the subscribers finish. Because of the pop() on the concurrent queue, the first EP blocks.

The reason why we use queues is because we don't want to run two instances of the functor in EP simultaneously because EPs are stateful. Each EP runs an event loop popping the event queue (this notion of using queues between stages is loosely comparable to the SEDA approach).

Quoting - AJ

This problem seems loosely related to something I've done. You can download my paper from here: http://www.yetisim.org/images/2/2a/YetiSimPaper-SCS2008.pdf

If you feel the problem is similar, then I might be able to help you.

AJ-thanks! I will go through the paper tonight and get back to you.

"Let me try to clarify this." I still don't see a need for blocking: just loop on parallel_for over the EPs, with each one actually firing if and only if input and synchronous outputs are both ready, neglecting for now the effect on relative timing between fast and slow EPs. There will still be blocking because not all worker threads finish at the same time, but it should be less if relative speeds are not too disparate. Another concern is that PEs perhaps mostly don't fire. Or it might just run great like this.

Quoting - tbbnovice

Between 50 and 500 floating point operations to process a single event.

Well... The overhead per single task provided non-realized parallelism is about 500-600 cycles in TBB. And if it will be realized parallelism (i.e. you are really crossing thread boundaries) multiply overhead by 10. And when I put TBB's concurrent_queue under heavy contention on quad-core single enqueue/dequeue operation takes up to 12,000 cycles.

So threads is not an option for your task. Period.

TBB tasks (or any incarnation of them like parallel_for) is not an option too.

What I would suggest is to aggregate EPs to 'partitions' persistently or temporary, so that you will be able to process several events (i.e. 'parent' event and its 'child' events and possibly their 'child' events too) with direct functions calls (no tasks! no enqueueing!). This way you can rise granularity. This also can help solve problems with synchronous messages (functions calls are indeed synchronous), and with serial ordered processing of child events (function calls are indeed serial and ordered).

However how to provide load-balancing and how to divide EPs to partitions are open questions.

Also you can employ specialized synchronization primitives. For example, for messaging between partitions you can use multi-producer/single-consumer queues instead of TBB's multi-producer/multi-consumer queues. mpsc queue can be implemented a way more efficiently than mpmc queue.

Quoting - Dmitriy V'jukov

Well... The overhead per single task provided non-realized parallelism is about 500-600 cycles in TBB. And if it will be realized parallelism (i.e. you are really crossing thread boundaries) multiply overhead by 10. And when I put TBB's concurrent_queue under heavy contention on quad-core single enqueue/dequeue operation takes up to 12,000 cycles.

Someone correct me if I'm wrong, but floating point operations are already pipelined/vectorized in hardware (all will be more so in the next few years). So I think 50~500 operations isn't a decent overhead for threading at all.

"single enqueue/dequeue operation takes up to 12,000 cycles" Nobody told me that at the shop... But really, can't we do better than that, for a single-producer/single-consumer queue that doesn't have to block? What's the cost of a spin mutex cycle again, especially if it was previously held by the same thread?

Speaking of same thread, that was my lingering concern, that parallel_for() would interfere with cache locality (I must admit that I did not anticipate the ideal grain size for the parallel_for to be the whole EP graph), so my back-up idea was something like the partitions that Dmitriy mentioned, either running continuously, or somehow exploiting task-to-thread affinity support. Looping on parallel_for would only be a poor man's FIFO, anyway.

Hmm, I wonder... if EPs are so low-level, how about the dual approach of staying with the event flow, using parallel_do? Not just with the individual events, of course: if a synchronous consumer is not ready, control would switch to execute it if required. Or is that prohibitively expensive as well, e.g., because EPs would have to be shared?

Very depressing... maybe you should use a FPGA instead.

Quoting - Raf Schietekat

"Let me try to clarify this." I still don't see a need for blocking: just loop on parallel_for over the EPs, with each one actually firing if and only if input and synchronous outputs are both ready, neglecting for now the effect on relative timing between fast and slow EPs. There will still be blocking because not all worker threads finish at the same time, but it should be less if relative speeds are not too disparate. Another concern is that PEs perhaps mostly don't fire. Or it might just run great like this.

Thanks a lot for all the ideas. I am attaching a sample graph to illustrate my points. A,B,..J are EPs. The arrows show the event flows. A and B are Initial while F and J are terminal.

I appreciate Dmitriy's point about partitions. Because the load of each EP is fairly stable and because we can partition before startup, it is a feasible approach. Assume we did that and all EPs are "heavy" e.g. take over 10k cycles.

Because EPs are stateful, they need to "stay alive" until the last event. Function calls are indeed synchronous but obviously I cannot run two instances of the functor in EP simultaneously because it will mess up the state. So if both A and B emit events eA and eB simultaneously, I can only run C on one event at a time. I can implement a concurrent queue inside C and let C dequeue in an infinite loop (which results in blocking/unblocking overhead AND results in two-producers-one-consumer) OR implement locking C (resulting in contention). Raf's idea of using a parallel_for wouldn't work if I have a long-running task but I don't know if locking is a better idea for an important reason: for async processing, C needs to have a concurrent queue anyway; as soon as an event hits C, it is enqueued to its own queue so that A and/or B can continue as usual. So if I go with the locking route, I would have the overhead of locking AND block/unblock.

Why do I think this is an interesting problem? Because it is clearly parallelizable and if we can get it to work on one machine perhaps I can use MPI and work on bigger graphs on a cluster and gain massive scalability. Given that TBB is likely to be available on Larrabee I think this idea has great potential. I am sure there is a cleaner solution than what I implemented because the overheads are too high as of now.

Sorry, can't figure out how to upload a file. The image is at http://i36.tinypic.com/16j4ps6.jpg

tbbnovice,
Having some of your EPs synchronous and others asynchronous is an unusual but not necessary unique request. Examples would be parallel safe functions using asynchronous and legacy parallel-unsafe EPs using synchronous.
I am in the process of building my own threading library (should be entering alpha test phase soon if you are interested).
The design goal of my library was to be low overhead and fully asynchronous task queuing through thread pool for both C++ and Fortran programmers. Although synchronous task queues are not in the current desig, it could be incorporated into the design with perhaps an hour of additional programming. 
One of the current features in the system now is the concept of a completion task which is to be run when a task or list of tasks complete (tasks can construct dependency trees if they wish). The change in the current design implement synchronous queue is relatively simple. A potential design might permit:
// sample test object
struct EPobject
{
public:
	int i;
};
typedef void voidFnEP(EPobject* pEP);


class SynchronousFnEP
{
public:
	SynchronousFnEP() {_ASSERT(false);};
	SynchronousFnEP(voidFnEP fn) : myfn(&fn) {};
	void Queue(EPobject* pObject) {
		qtControl.QueueWork(myfn, pObject); };
private:
	voidFnEP* myfn;
	qt::SynchronousControlStructure qtControl;
};


// declare your function
void Foo(EPobject* ep);
SynchronousFnEP SynchronousFoo(Foo);

EPobject ep; // must live outside scope of aTest()

void aTest()
{
	ep.i = 123456789;
	// somewhere in your code
	SynchronousFoo.Queue(&ep);
}

void Foo(EPobject* ep)
{
	printf("In foo &dn", ep->i);
}
The above works using the asynchronous control queuing. Adding a synchronous control object would be QED.
You can contact me off line if you wish to work as an alpha tester (jim_dempsey@ameritech.net).
Jim Dempsey
(forum manager copy and paste from clipboard needs work)
Having some of your EPs synchronous and others asynchronous
is an unusual but not necessary unique request. Examples would
be parallel safe functions using asynchronous and legacy
parallel-unsafe EPs using synchronous.
I am in the process of building my own threading library
(should be entering alpha test phase soon if you are interested).
The design goal of my library was to be low overhead and
fully asynchronous task queuing through thread pool
for both C++ and Fortran programmers. Although synchronous task
queues are not in the current desig, it could be incorporated into the
design with perhaps an hour of additional programming.
One of the current features in the system now is the concept of a
completion task which is to be run when a task or list of tasks
complete (tasks can construct dependency trees if they wish).
The change in the current design implement synchronous queue
is relatively simple. A potential design might permit:

(see sample in prior post)
jim_dempsey@ameritech.net
Jim Dempsey

As always, I'll mention the IRC channel on freenode. #tbb. I lurk on there, and would be willing to talk about your idea in real-time...

Did my paper give you any ideas?

Quoting - AJ

As always, I'll mention the IRC channel on freenode. #tbb. I lurk on there, and would be willing to talk about your idea in real-time...

Did my paper give you any ideas?

Jim, thanks for the offer. Let me figure out if TBB is not the right solution for my problem and if yes, I will certainly get in touch with you.

AJ, I am half-way through your paper and yes, there are parallels! I will get in touch with you in a day or two. Thanks!

Quoting - AJ

As always, I'll mention the IRC channel on freenode. #tbb. I lurk on there, and would be willing to talk about your idea in real-time...

Did my paper give you any ideas?

Off-topic:AJ thanks for linking the paper, it was an interesting read indeed. Using a UML state-graph to define the threading of an app, and defining the state-machine like that. I use the state-machine approach myself too so it gave me a few insights there.

You can get a generic "execution graph" template from the TBB Community svn repo (there are other goodies in there). On Linux, you can do this:

svn co http://svn.tbbcommunity.org/svn/tbbcommunity

Also there are some blogs on how I designed the construct here:

http://blogs.yetisim.org/2008/01/24/tbb-state_machine-part-1-basic-ideas...

Remember, this is NOT a state machine (initially it was, but it evolved)... it's a method of execution, where you can change things a bit at runtime if you want. I've been researching how to build a static version with boost.proto, but that's a different story... YetiSim is at a bit of a stand-still while I investigate certain data structure issues, and optimize the execution graph concept. You can browse the code at websvn.yetisim.org.

Quoting - AJ

On Linux, you can do this:

svn co http://svn.tbbcommunity.org/svn/tbbcommunity

It can appear surprising, but on Windows you can do exactly the same ;)

Quoting - AJ

This problem seems loosely related to something I've done. You can download my paper from here: http://www.yetisim.org/images/2/2a/YetiSimPaper-SCS2008.pdf

If you feel the problem is similar, then I might be able to help you.

AJ, I don't understand one moment in the paper. It's about Table 2: A Comparison of Average Execution Times for the Clock Simulation.

As I see on 8 cores speedup is only 1.62 (as opposed to expected ~7-8). Initially I was thinking that you run only single instance of clock model, it allows not very much parallelism, so speedup of 1.62 is logical. But then I read, that you run 10^6 completely independent clock models. What is the problem? Or I am missing something? I think that 10^6 independent clock models must scale linearly... well, at least up to 10^6 processors :)

And I've noticed that speedup degrades if more threads are added. I.e. speedup between 1 and 2 threads is 1.2864, speedup between 2 and 4 threads is 1.1541 and speedup between 4 and 8 threads is 1.0917. So it seems that after 8 cores it will not run any faster at all...

Well, if I am not missing something it seems that it's not very successful approach to parallelization of such modelling systems...

Can you provide some comments on this?

Sure. I did not understand at the time why the model did not scale linearly either. It seems like it really should. I believe that the fundamental issue is that I have not used cache-optimized data structures, in fact some of them were wrapped with mutexes. Also I believe that there are some subtle memory issues that have to be addressed. At the time I did not have as much knowledge as I do now (this was submitted for publication one year ago). Also I believe that the parallel_reduce method has some short-comings that I didn't really understand at the time... for instance, I believe that the join step has some ordering to it, which is not strictly required in this application.

I am currently writing new data structures, which will support this programming model efficiently, and greatly improve the performance. Note that if I had been a bit dishonest in the paper, and given each stage more work to do, it would scale perfectly. This is not the case in general, what if I wanted to use these execution graphs to model a cellular automata which basically has no work to do?

Thanks for reading the paper and your insights. Code is available online for that paper, so feel free to browse the repository. Once my data structures are completed, I will be rewriting the library from scratch.

Quoting - AJ

I am currently writing new data structures, which will support this programming model efficiently, and greatly improve the performance. Note that if I had been a bit dishonest in the paper, and given each stage more work to do, it would scale perfectly. This is not the case in general, what if I wanted to use these execution graphs to model a cellular automata which basically has no work to do?

Thanks for reading the paper and your insights. Code is available online for that paper, so feel free to browse the repository. Once my data structures are completed, I will be rewriting the library from scratch.

Please, post here the results when the rewrite will be done (I think that while it uses TBB this forum is the relevant place).

Leave a Comment

Please sign in to add a comment. Not a member? Join today