Concurrent_queue push when full behavior

Concurrent_queue push when full behavior

Ender1618的头像

Im completely new to TBB, so please forgive my ignorance.

Is there any way to have concurrent_queue drop the oldest item in the queue (if its reached its max size) on a push instead off blocking, and in a different usage is there a method for the queue to just not add to the new item on a push if its reached its max size?

The idea is to have a producer consumer throttling method, my data is a stream and i can afford drops if the consumer can't keep up with the producer. Since the producer can produce at a variable rate (typically steady, but it can spike).

I use this method in my own concurrent queue, with native threads running the producer and consumer. I just wondered if this would be possible using the TBB concurent_queue, as i am considering replacing most of my native threading code with TBB, and I need these behaviors.

Thanks

8 帖子 / 0 new
最新文章
如需更全面地了解编译器优化,请参阅优化注意事项
Ender1618的头像

Forgive me i meant concurrent bounded queue, not concurrent queue.

jimdempseyatthecove的头像

Have you considered writing your own specialized ring buffer? (IOW - push when full implies pre-pop and discard)

Jim Dempsey

www.quickthreadprogramming.com
robert-reed (Intel)的头像

Does it have to be the oldest entry?  You could switch the push() for a try_push() and discard the newest on failure.  Or if not, how about a wrapper that does something like a "while (!try_push(item)) {pop(discard)}"?

Ender1618的头像

Quote:

jimdempseyatthecove wrote:

Have you considered writing your own specialized ring buffer? (IOW - push when full implies pre-pop and discard)

Jim Dempsey

How would you create a specialized ring buffer that has the same TBB concurent lockless properties of concurrent bounded queue? Is there a way to specialize concurrent bounded queue?

Ender1618的头像

Quote:

robert-reed (Intel) wrote:

Does it have to be the oldest entry?  You could switch the push() for a try_push() and discard the newest on failure.  Or if not, how about a wrapper that does something like a "while (!try_push(item)) {pop(discard)}"?

I need the abilty to select discard oldest or newest mode, discard oldest is right now the most important mode, since its the oldest its the least relavent in my circumstance (live streaming, newer items are most relavant). Would i be able to iterate and erase the oldest item (essentially the item at the front of the queue, next to be processed), in a safe qucik manner?

Raf Schietekat的头像

Quote:

Ender1618 wrote:

Would i be able to iterate and erase the oldest item (essentially the item at the front of the queue, next to be processed), in a safe qucik manner?

Robert provided suggestions for both back and front, respectively (the "or if not" apparently applies to the second sentence, not to the question).

That means that there is probably no need for new functionality, although the second suggestion is vulnerable to race conditions, so you should probably replace the pop() with a (void)try_pop() to avoid getting stuck or possibly deadlocked, however unlikely.

Alexey Kukanov (Intel)的头像

The solution suggested by Robert (with Raf's correction) seems to be as good as we could do in a special method for concurrent_bounded_queue, if not even better.

For a special method to replace the oldest item we would need to solve how that method interoperates with the usual blocking push(), i.e. if there are waiting push() calls, what should the new method do - keep them waiting, let them proceed first (in order to keep FIFO), or maybe abort them? And I am not sure if any of these solutions can be implemented with low cost while keeping consistent state and guaranteed properties of the queue.

In your app, you might just not bother about such subtleties and require that blocking push() is never used directly.

One more correction you might consider (in the case of mutiple producers pushing to the queue) is to not try putting a new item indefinitely but just as many times as the queue size limit is. The reasoning is that if a thread popped as many items as the queue size and still cannot push its own item in, there must have been just as many "newer" items pushed by concurrent producers, which made the thread's item "outdated".

登陆并发表评论。