Concurrent_queue push when full behavior

Concurrent_queue push when full behavior

Portrait de Ender1618

Im completely new to TBB, so please forgive my ignorance.

Is there any way to have concurrent_queue drop the oldest item in the queue (if its reached its max size) on a push instead off blocking, and in a different usage is there a method for the queue to just not add to the new item on a push if its reached its max size?

The idea is to have a producer consumer throttling method, my data is a stream and i can afford drops if the consumer can't keep up with the producer. Since the producer can produce at a variable rate (typically steady, but it can spike).

I use this method in my own concurrent queue, with native threads running the producer and consumer. I just wondered if this would be possible using the TBB concurent_queue, as i am considering replacing most of my native threading code with TBB, and I need these behaviors.

Thanks

8 posts / 0 nouveau(x)
Dernière contribution
Reportez-vous à notre Notice d'optimisation pour plus d'informations sur les choix et l'optimisation des performances dans les produits logiciels Intel.
Portrait de Ender1618

Forgive me i meant concurrent bounded queue, not concurrent queue.

Portrait de jimdempseyatthecove

Have you considered writing your own specialized ring buffer? (IOW - push when full implies pre-pop and discard)

Jim Dempsey

www.quickthreadprogramming.com
Portrait de robert-reed (Intel)

Does it have to be the oldest entry?  You could switch the push() for a try_push() and discard the newest on failure.  Or if not, how about a wrapper that does something like a "while (!try_push(item)) {pop(discard)}"?

Portrait de Ender1618

Quote:

jimdempseyatthecove wrote:

Have you considered writing your own specialized ring buffer? (IOW - push when full implies pre-pop and discard)

Jim Dempsey

How would you create a specialized ring buffer that has the same TBB concurent lockless properties of concurrent bounded queue? Is there a way to specialize concurrent bounded queue?

Portrait de Ender1618

Quote:

robert-reed (Intel) wrote:

Does it have to be the oldest entry?  You could switch the push() for a try_push() and discard the newest on failure.  Or if not, how about a wrapper that does something like a "while (!try_push(item)) {pop(discard)}"?

I need the abilty to select discard oldest or newest mode, discard oldest is right now the most important mode, since its the oldest its the least relavent in my circumstance (live streaming, newer items are most relavant). Would i be able to iterate and erase the oldest item (essentially the item at the front of the queue, next to be processed), in a safe qucik manner?

Portrait de Raf Schietekat

Quote:

Ender1618 wrote:

Would i be able to iterate and erase the oldest item (essentially the item at the front of the queue, next to be processed), in a safe qucik manner?

Robert provided suggestions for both back and front, respectively (the "or if not" apparently applies to the second sentence, not to the question).

That means that there is probably no need for new functionality, although the second suggestion is vulnerable to race conditions, so you should probably replace the pop() with a (void)try_pop() to avoid getting stuck or possibly deadlocked, however unlikely.

Portrait de Alexey Kukanov (Intel)

The solution suggested by Robert (with Raf's correction) seems to be as good as we could do in a special method for concurrent_bounded_queue, if not even better.

For a special method to replace the oldest item we would need to solve how that method interoperates with the usual blocking push(), i.e. if there are waiting push() calls, what should the new method do - keep them waiting, let them proceed first (in order to keep FIFO), or maybe abort them? And I am not sure if any of these solutions can be implemented with low cost while keeping consistent state and guaranteed properties of the queue.

In your app, you might just not bother about such subtleties and require that blocking push() is never used directly.

One more correction you might consider (in the case of mutiple producers pushing to the queue) is to not try putting a new item indefinitely but just as many times as the queue size limit is. The reasoning is that if a thread popped as many items as the queue size and still cannot push its own item in, there must have been just as many "newer" items pushed by concurrent producers, which made the thread's item "outdated".

Connectez-vous pour laisser un commentaire.