Blocking pipeline output

Blocking pipeline output

I have a pipeline that im using to fill a result buffer with processed items. However I don't want the result buffer to get more than x number of items. This means that the pipeline somewhere has to "sleep" until someonepulls (takes) an item from the buffer.
Could I do this by using a blocking concurrent_queue::push in the serial output filter? How does the pipeline handle when the maximum number of tokens are active? Is it ok to block the output filter (in the same way you can block the input filter)? If the output filter is run as a task this would be a problem since it would block a thread in the tbb threadpool for an unknown amount of time?
My rather less elegant solution right now is that my input filter blocks until it gets a "ticket" from the output filter. The problem with this is that I get a rather ugly dependency between the input and output filter. And also not every processed item gets put into the result buffer which means the "ticket" system needs to take this into account.

www.casparcg.com
35 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

If the alternative is to block the input filter, you might as well block the output filter: either deprives TBB of one worker thread (that's not to say there isn't room for improvement!). You might be surprised to learn that this maximum number of tokens in flight is based on a ticket system itself (you say ticket, TBB says token, let's...), and that a pipeline has internal queues before non-input serial filters, but it's not trivial to plug into that, although you might also decide to set the pipeline maximum number of tokens in flight (quite the mouthful!) to x and do away with your own concurrent_queue, if that is appropriate for the situation. It's far from ideal, but this should get you... unblocked, so to say.

I was not aware that the input filter is run on a tbb taskpool thread. I had the impression that the input filter was run on the same thread that pipeline::run was called. If this is not the case then yes, I might aswell block in the outputfilter.This is a problem for me since in my application I could end up blocking all of the tbb taskpool threads... how would I solve that?EDIT:Could I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?

www.casparcg.com

" was not aware that the input filter is run on a tbb taskpool thread. I had the impression that the input filter was run on the same thread that pipeline::run was called. If this is not the case then yes, I might aswell block in the outputfilter."
I consider the pipeline::run() thread to be part of the pool, assuming that you even know on which thread it's running, because it is one of the 1+(N-1) threads that participate approximately on an equal basis in processing and stealing on a system with N hardware threads if automatic initialisation is not overridden (by default this creates N-1 additional threads), which is why I'm not sure there's even a meaningful distinction between master and worker. So if you block any filter, only N-1 threads are available, and it's not really relevant whether the original master is among them or not. The situation will probably change somewhat when tasks get segregated by master thread (I don't know yet how that's going to work out), but for now that's what you have.

"This is a problem for me since in my application I could end up blocking all of the tbb taskpool threads... how would I solve that?"
Exactly: it would only work if TBB only has that pipeline to run, or if you require a certain level of parallelism, which is why I called this "far from ideal".

"EDIT:
Could I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?"
That's not what a thread_bound_filter does (unless I'm mistaken), but you could perhaps assume that this filter is almost always in a blocked state and initialise TBB with an additional thread, if you want to go that way. But then you'll incur additional overhead related to thread scheduling, I'm afraid (right?).

But I don't know the overall setup...

This is a rather big problem for me.Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.EDIT:The input filter is also reading from a file, which also would be a good idea to have on its own thread?

www.casparcg.com

I've had another look at thread_bound_filter, and it does seem like you could run each pipeline in a separate thread and run all the output filters on the same thread (the one that starts the pipelines). But running an unbounded number of pipelines would lead to oversubscription, and avoiding that is one of the reasons to turn to TBB...

on the same thread (the one that starts the pipelines)

The documentation says "The thread that services a thread_bound_filter must not be the thread thatcalls pipeline::run()". I remember seeing somewhere something like pipeline.start(), pipeline.end(), but i can't find anything like that in the documentation or the source files. According to the reference I would need to have an additional thread for servicing a thread_bound_filter.Could I have a single thread that all output filters run on and does all the blocking? Using something like "WaitForMultipleObjects", is this possible with tbb?

www.casparcg.com

Sorry, I meant "the one that starts the threads that run the pipelines" (really, I did), but even without the oversubscription it would have to be a polling solution (for both pipelines and concurrent_queues, with sort of a poor man's rendez-vous for each pair), which doesn't seem very appealing.

Also, I'm wondering how to even tune TBB to coexist with user threads.

I think I'll bench myself on this one, for now anyway.

Quoting nagyCould I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?

Yes thread-bound filter is for some filter to be executed on its own thread.

However I am not sure why would not you just use a regular pipeline, with the limit to the maximal number of tokens set to the value you need. At glance, it should do exactly what you want.

Edited: Oh I see, you want to run several pipelines at the same time. Indeed, this is not well supported in TBB at the moment because pipeline.run() is a blocking call so you'd have to start each pipeline in its own thread. I will think whether there is a better solution to suggest toyou.

Hi, initially when I just read the post I thought that putting a token into a bounded queue with a preset capacity during the last pipeline stagewas some sort of a design feature. But then you're saying that you just want to limit the number of tokens being passed through the pipeline to avoid memory overuse. This is exactly what the argument for pipeline::run(num_of_tokens) is for and it seems like the ideal solution to the problem you're trying to solve would be to pull the "consumer" functionality right into the pipeline and make it the last stage. Or else if this is some untouchable legacy code we're talking about you could wrap it with a "consumer"-proxy that talks directly to the consumer and provides it with tokens in a synchronous manner one by one and add consumer-proxy as the last stage of the pipeline.

Quoting nagyThis is a rather big problem for me. Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.EDIT:The input filter is also reading from a file, which also would be a good idea to have on its own thread?

Let me ask some questions (and maybe more will follow after your answers). What is the existing (sequential?) way to do the job? Does your code run several producer/consumer patterns at the same time? If so, does every producerrun in its own thread, or all in a single thread?

That got me thinking.Im unsure if it would work to do the consumer as a pipeline stage since the consumer in my case could sleep between packets. The feature im writing for our application is a video player and the "consumer"/"display" thread has to sleep to sync the framerate which is why i have a bounded queue "in front" of the consumer.Sorry too bring new details into play, but I try to keep it as simple as possible most of the time.
However instead of blocking the output filter I believe I should simply stop the input filter tasks from being spawned while the buffer to the consumer is full. I would need some way to externally control the pipelines active token count (one option to control this is as we said earlier, an blocking thread_bound_filter, at the cost ofover subscription). Maybesome kindof special last filter stage, unsure how that would work? Or maybe i could manually control how the input filter tasks are spawned?SobasicallyI would need a way to control how the pipeline spawns input filter tasks.

www.casparcg.com

Hi, in my understanding if the allowed number of tokens does not change during the application runtime, then you don't need to be able to control it externally during runtime - just set it once, when you start up the pipeline.Unless I'm missing something here, it totally makes sense to pull the consumer into the pipeline as the last stage now. First - you get rid of the over-subscription by removing this additional "consumer" thread that you have now. Second - you implement this last consumer stage as a serial_in_order stage and you put all of the frame rate sync code inside operator(). Then, when you start pipeline, you just specify "x" as an input argument to the pipeline.run() function. And this turns pipeline into the functionality that you want, plus the queue that you want right in front of the last "consumer" stage. Does that make sense?

To me, it does make sense - but onlyas long as there is just one pipeline. With multiple such things (that were mentioned once), it might not work.

Side note: I am sorry for setting "1 star" rating to your above post; it was an accidental click.

This is starting to sound pretty good.However there is one last issue. If I put the consumer into the pipeline then the last stage in the pipeline will occasionally sleep/block to sync the framerate, thus still taking up an entire tbb taskpool thread. If I run several pipelines this would be a problem? Only way I can see avoiding this is with external active token control, or have I missed something?

www.casparcg.com

#12 "Does that make sense?"
That was basically my suggestion in #1 (use existing flow control and queue), but as things are, to avoid taking worker threads away from TBB, I think you'll have to poll with a thread_bound_filter or dedicate a new thread (with all associated static and dynamic overhead), unless you have another idea that uses TBB as-is (I still have a few ideas, but they would require some non-trivial changes, and it would still be unclear how that would work out in the uncontrolled environment that has those user-thread consumers)?

It's a question with broader significance: how does TBB behave if it doesn't "own" the whole machine? Even two separate processes both using TBB don't make any attempt to coordinate their efforts to avoid oversubscription, so what does that do for performance?

I'm unsure if its allowed to modify any of the tbb files. Also I can only read the header files (are the source files available somewhere so I can see how the pipeline works?).But what I could do is to modify the pipeline.h and expose two methods in the pipeline and filter classes called "inc_token_count" and "dec_token_count" which modify the pipeline "token_counter". When the output filter pushes an item into the buffer it would call "inc_token_count" (to negate the dec that the pipeline does internally) and when the consumer pulls an item from the buffer a callback would be called which calls "dec_token_count". The "token_counter" seems to be atomic, would such an operation be supported by the pipeline?EDIT:swapped inc and dec in a few places

www.casparcg.com

The token_counter does something else, and it's not as simple as modifying the other atomic.

indeed, I found the correct download with the source code.EDIT:Been reading through the source, unsure if I will be able to modify it to suit my needs. I will at least try if there are no better solutions? Any hints or tips are appreciated.

www.casparcg.com

Second what Raf is suggesting. If you're OK with pulling the consumer into the pipeline, but don't want to take out one of the TBB threads on frame rate sync, just make that last stage an in-order thread_bound_filter and have it working on an external thread. Setting "x" as an argument to pipeline.run() will set your memory limits and if consumer filter-thread sleeps and is too slow the TBB threads will build up a queue internally of exactly x tokens before TBB decides to pause the input filter.

Ok, this is probably the best solution for now. Thank you all for the help.
I'd like to request a reserve/unreserve token feature,with the properties I've suggested in this thread, in future version of the pipeline.

www.casparcg.com

"Let me ask some questions (and maybe more will follow after your answers). What is the existing (sequential?) way to do the job? Does your code run several producer/consumer patterns at the same time? If so, does every producerrun in its own thread, or all in a single thread?"
I missed this post, sorry for the late answer.
The way this was done before I started rewriting the code to tbb, it worked the following way.
A producer runs in its own thread, and runs a loop which reads a packet from a file, processes itsequentiallyin different steps (aka pipeline filters), and then puts it into a blocking bounded queue (this needs to block since I do not want the entire file in memory while the consumer pulls packets). This is repeated until all packets are read from the file.
The consumer runs in its own thread. It is connected to one or several producers by receiving a shared pointers to theirqueues. It then pulls items from the queue and sleeps between to sync the rate at which packets are consumed.
Every consumer and producer runs in its own thread, even if there are several producers or consumers.
This design suffers from heavyover-subscription and limited scalabilitywhich why I'm currently rewriting it.
EDIT:
How do I quote in these forums? At first I thought the post I'm replying to isautomaticallyquoted, but that doesn't seem too be the case. And the "blockquote" button doesn't seem to do anything.

www.casparcg.com

I have solved the problem, without the need to create an additional thread for blocking.I create a ConsumerProxy (as suggested) implementing athread_bound_filter(as suggested).This way I can let the consumer pull from the proxy and the legacy consumer thread blocks if there are no items available. Thus avoiding both the problem with an additional blocking thread or blocking the tbb taskpool.It looks something like this.class ConsumerAdapter : tbb::thread_bounded_filter VideoFramePtr get_frame() // buffering is not included for simplicity { while(frameQueue_.empty() && !eof_) eof_ = process_item() == tbb::thread_bound_filter::end_of_stream; if(frameQueue_.empty()) return nullptr; VideoFramePtr result =frameQueue_.front(); frameQueue_.pop(); return result; } void* operator()(void* item) { if(item != nullptr) frameQueue_.push(VideoFramePtr(static_cast(item))); return nullptr; }What happens is that as soon as the consumer needs a frame it calls get_frame, if there are no frames available it calls the thread_bound_filter function process_item which potentially pulls an item from the pipeline. Works like a charm.Seems to work.Thanks for all the help.EDIT: updated the codeEDIT2: Did some more testing. It seems that a blocking thread_bound_filter has very high overhead. Even when used in the waydemonstratedin the tutorial document. My original solution with blocking tasks had about 30% cpu utilization while if i use a thread_bound_filter i get up to 80% cpu utilization, something is wrong.

www.casparcg.com

It seems a good idea to lend thread_bound_filter the use of a thread only when needed (but then of course you know more about that legacy code than you've told us).

Unfortunately for performance, process_item() busy-waits.

Quoting nagyEDIT2: Did some more testing. It seems that a blocking thread_bound_filter has very high overhead. Even when used in the waydemonstratedin the tutorial document. My original solution with blocking tasks had about 30% cpu utilization while if i use a thread_bound_filter i get up to 80% cpu utilization, something is wrong.

Yes, as Raf said, process_item() busy-waits, and we actually need to fix it (I am sorry, the problem was just lost from our radars, so to say).

Meanwhile, the recommendation is to use try_process_item() (which returns one more type of value to indicate absense of work) and some external blocking mechanism, such as Windows event, or condition_variable, or semaphore.

yes, under further investigation i noticed it does a__TBB_Yield() loop. I hope you will be able to fix this as soon as possible. Meanwhile I while usetbb::interface5::condition_variable.

www.casparcg.com

Quoting nagyMeanwhile I while usetbb::interface5::condition_variable.

The recommendation is to use it as std::condition_variable.

tbb::interface5 namespace is an implementation detail that can change over time (e.g. if the implementationis reworked in incompatible way it will go into tbb::interfaceX), while the interface represented by std::condition_variable is expected to be backward-compatible.

I didn't want to create another thread, but there seems to be an error in the reference document.

If there is other work to do while the pipeline is running, the call to methodpipeline::run can be replaced by a pair of calls pipeline::start_run andpipeline::finish_run, and the calling thread can do other work between the calls.The example in Section 3.9.7 has an example.

This does not seem to be accurate?

www.casparcg.com

Yes, it's inaccurate. I apologize for that. In prototypes of thread-bound filter, we tried to provide the ability to bind it to the same thread that starts the pipeline; thisnote in Reference is an artifact of that effort. The methods were never provided as a supported feature.

If you do not want to start another thread for TBF but instead use the current one, I have a suggestion. The common idea is to fire a special task to start the pipeline, which will be taken and executed by a TBB worker thread. This way, the worker becomes blocked by pipeline::run(), while your thread can serve TBF or do something else. I can suggest two ways for implementing that:
- use class task_group. It is pretty convenient to fire a task for asyncronous execution, then do some other stuff, then wait for completion of the task. Its disadvantage is that if no worker thread is available (e.g. the program runs on a single core), the task may not get executed so the pipeline won't start at all.
- use new method task::enqueue() available in most recent stable releases. It is easy enough to start a task though not as convenient as task_group, and much less convenient to wait for task completion (no direct support to wait for enqueued tasks; if you need it you have to do it manually, either with an event/semaphore/condition variable, or with otherwise unnecessary parent task). Its advantage is that it guarantees execution by a worker thread, i.e. does not have the problem of the first approach.

The common shortcoming of both approaches, however, is that the worker thread that takes the task and starts the pipeline will busy-wait when the pipeline is empty but not finished.

I've been experimenting a bit and i still can't get rid of the cpu overhead of thread_bound_filter. Its better than when I had the busy wait, but im still at 70% cpu compared to 30% with blocking tbb task.
As suggested I'm using try_process_item with my own blocking.My current setup is something like like this. This is not my intended final solution but I need to be able to solve the overhead problem first.input_filter -> process_filter -> output_filter -> dummy_filterinput_filter: reads from a fileprocess_filter: different processing filtersoutput_filter: implements a blocking "ticket system"concurrent_bounded_queue queue_;FramePtr output_filter::GetFrame(){ FramePtr pFrame; queue_.pop(pFrame); returnpFrame;}void* output_filter::operator()(void* item){ /* Does what's necessary... */ queue.push(pFrame); // should never block return nullptr;}dummy_filter : a thread_bound_filter that does nothing and is used to control pipeline token count.Then I start the pipeline like this.tbb::tbb_thread t([&]{ pipeline.run(n_tokens); });do{ FramePtr pFrame = output_.GetFrame(); // Blocking pop, wait until there is a finished frame if(pFrame) frameBuffer_.push_back(pFrame); // Blocking push, wait until there is room for another frame}while (dummy_.try_process_item() != tbb::thread_bound_filter::end_of_stream); // Release token countt.join();Any ideas as to what I might be doing wrong?EDIT: Does the pipeline also busy wait when waiting for the thread_bound_filter to release a token? As far as I can see from the pipeline source theinternal_process_item doesn't result in any new tasks being spawned, or any other notification other thanlow_token being incremented.

www.casparcg.com

It seems this discussion died out. I will do one last try.Ive looked a bit onpipeline_root_task which looks somewhat like this: (I have removed the segmentscanning part since no non-thread_bound_filter follows my thread_bound_filter, i am not 100% sure I can do this but from what I understood from the code comments it shouldnt do anything in my case) /*override*/ task* pipeline_root_task::execute() { if( !my_pipeline.end_of_input ) if( !my_pipeline.filter_list->is_bound() ) if( my_pipeline.input_tokens > 0 ) { recycle_as_continuation(); set_ref_count(1); return new( allocate_child() ) stage_task( my_pipeline ); } if( !my_pipeline.end_of_input ) { recycle_as_continuation(); return this; } return NULL; }From my understanding this task will keep recycling and reexecute itself when a thread_bound_filter holds tokens (blocking) causing input_tokens == 0. I believe this is a busy-wait? Wouldn't it be a better solution if the pipeline_root_taskwasn't recycled in the case of being starved by a thread_bound_filter. The thread_bound_filter would then respawn thepipeline_root_task once it release one (or several?) tokens and if thepipeline_root_task has died out.

www.casparcg.com

Your analysis is basically correct. Yes it's a busy wait. Indeed the TBF implementation suffers from these, and should be improved somehow, by sleeping internally.

I will think some more what can be done to solve your case, and get back with ideas later today, or tomorrow.

Best Reply

Quoting nagyThis is a rather big problem for me. Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.EDIT:The input filter is also reading from a file, which also would be a good idea to have on its own thread?

I thinkit makes sense to step back, and base on your use case which seems to be well described in the above quote.
You want to use bounded (and so blocking) queue as the interface between the pipeline-based producer and thread-based legacy consumer. It is expected that producer fills the queue faster than the consumer drains it; in this case, the pipeline should block and wait. On the other hand, there might be several simultaneous pipelines working at the same time, so blocking a TBB worker thread is undesirable. Ideally, the master thread that started the pipeline should be the one that blocks.
Unfortunately, the current TBB pipeline implementation has the problems of idle spinning here and there, in particular with thread-bound filters. Basically, the only way to avoid idle spinning in situations when the pipeline can neither proceed nor finish is to make the master thread do a blocking call, and let worker threads go asleep due to absence of available tasks.
Now what I can suggest is to recognize which thread executes the last pipeline stage that should push an iteminto the queue, and do different things depending on that. In a sense, it's like converting a non-thread-bound filter toact likethread-bound. To understand which thread runs the filter, you might use tbb_thread::id (see sections 12.2 and 12.3.1 in TBB 2.2 Reference Manual) - I hate to say this as we always argue for "thread-agnostic" parallel programming, but after all it's TBB issues that require such a workaround. If the master thread executes the filter, it uses the blocking push() method of the queue, so that it blocks when the queue is full. If the worker thread executes the fliter, it should use the non-blocking try_push() method. The question is, what to do when try_push fails. So far, there is no way to tell the pipeline that the filter failed to process the current token, and it should be re-attempted. So the solution I see is to use an intermediate queue in the filter for such items. As the lastfilter should be serial, std::queue would work there, without any additional locking.
In the pseudocode, the last filter I described looks like this:

if the filter is executed by the master // can block
    // push pending items first
    while intermediate queue is not empty
        pop an item from the top on intermediate queue
        push this item into the bounded output queue
    push the item received as the argument into the bounded output queue
else // the filter executed by a worker; should not block
    // push pending items first
    while intermediate queue is not empty
        read an item from the top of intermediate queue (but not pop it)
        try_push this item into the bounded output queue
        if try_push succeeded
            pop from the top of intermediate queue
        else
            break the while loop
    // process the item received as the argument
    if intermediate queue is empty // i.e. no more pending items
        try_push the received item into the bounded output queue
        if try_push succeeded
            return
    push the item into intermediate queue
return

The non-blocking section might be simplified if first of all the received item is unconditionally pushed to the intermediate queue; then you can just process the queue in the loop. It's little bit suboptimal execution-wise, but the code will be simpler.

Sounds like a solution. I was not aware of the concept of master threads though. Previously I thought that the task scheduler only uses its own worker threads to avoidoversubscribtion. Where can I find more information explaining this? What if the last item is not executed on the master thread and previous items failed to push into the destination buffer?EDIT: I guess the thread calling wait() (master?) is also included into the task schedulers worker threads?

www.casparcg.com

Any user thread that calls wait_for_all() either directly or via parallel algorithms including pipeline::run() becomes the "master" thread that supplies at least initial set of tasks for TBB workers, and it participates in processing of the tasks.

The task scheduler by default allocates one less SW thread than there are HW threads available, to account for the user thread that starts the work.

Yes, there should be special handling for the last items that can hoard in the intermediate queue; probably it can be done right after returning from pipeline::run(), or alternatively in the destructor of the filter.

Leave a Comment

Please sign in to add a comment. Not a member? Join today