updating queue processor using flow graph, and edge messaging

updating queue processor using flow graph, and edge messaging

I'm trying to use the tbb::flow (or pipeline/or task groups?) to cycle over a queue of "intervals": read a few top intervals, bisect each into A and B subintervals, check each half and possibly add them back into the queue; repeat.

The queue is actually a priority queue, managed separately. When an interval is being added, the queue reading access is locked and the right insertion point is found, possibly close to the top. So, I'd like to read the next top interval as late as possible, for example when at least 2 threads have become available to bisect-check it. That I think can be done with the "counting" continue_node.

The queue is regularly purged of low priority intervals. The length of the queue is going to converge to and fluctuate for a while around 0. I'm not sure how to exit the cycle then: say when the queue is empty at some moment I simply have nothing to send over the link explicitly, but how do I prevent the implicit message over the edge upon return from the body's operator()?? Other checking task could be adding intervals at the same time, so the queue may becomes non-empty again. It's when all the threads see the empty queue, the graph should exit. Can I use the graph::wait_for_all to wait for the "no activity" in the graph, that is, no explicit messages and no tasks running?

I'm thinking to use just 3 nodes as follows,

(function_node<continue_msg, interval> to read & bisect the top intervals)
--explicit try_put(A), try_put(B), or NO MESSAGE!?-->
(function_node<interval, continue_msg> to check&add, concurrency=nthreads)
( continue_node with T=2 )
repeat from the top

The flow can be started by populating the queue with nthreads/2 initial intervals and sending nthreads/2 continue_msgs to the first node to keep the the flow saturated.

This would be my first tbb::flow program, am I thinking in the correct terms? My main question then is the link between the two function_nodes above (correct node type?): can I have a full control over the edge messaging, to send either two half-intervals A&B, or NO MESSAGE if the queue is currently empty. If the other tasks send no messages over this edge, it should mean we're done and hopefully that's the time the wait_for_all to return?


2 Beiträge / 0 neu
Letzter Beitrag
Nähere Informationen zur Compiler-Optimierung finden Sie in unserem Optimierungshinweis.

Hi Igor,

I am not sure I fully-understand your description of the solution you propose, but I can talk a bit about the nodes themselves, and about the pipeline.

The pipeline and parallel_pipeline do not allow a stage to split a token into two or more parts.  So splitting an interval into two is not possible unless you do something not strictly within the pipeline (for instance explicitly putting one of the intervals to your priority queue and forwarding the other.)

The pipeline is also structured so it is started, runs until the input stage signals it is finished, and then clears.  If a priority queue feeding the pipeline signaled it was empty, the pipeline would have to be restarted if an item was added.  If your priority queue generally has zero intervals in it, you'll have to issue pipeline.run() pretty frequently.

Of course doing a try_put() to a flow graph node also spawns a task to perform that node's actions.  So that is still happening under the covers.

A graph.wait_for_all() waits for all those tasks to be completed.  It does not signal anything in the graph, but on return from the call the graph is guaranteed to be inactive (as long as no other thread has not done a try_put() to a node in the graph in the interim.)

The function-based nodes (continue_node, function_node, multifunction_node) have input buffers built-in by default, so if your priority queue knows exactly what order the intervals should be processed in, it can try_put() them to the node which, given the available threads, will process the intervals.

I was not sure why you said you need two threads to split an interval.  If you are using a forall or explicit tasks to do the splits, spawning them (or starting the forall) will let them compete with the other tasks created by the graph.

You can limit the concurrency of a function_node with the concurrency parameter to the constructor.

Best Regards,

Melden Sie sich an, um einen Kommentar zu hinterlassen.