flow_graph - A video decoder

flow_graph - A video decoder

I would like to try to rewrite my video decoder with tbb flow graph and thought I'd ask for some advice.

Basically I thought I'd setup my graph as follows:

                                   
                                   VideoDecoder(1 in -> x out)
                                   /             
Pushing Packet Source -> Broadcast                Join -> Polling Target
                                                /
                                   AudioDecoder(1 in ->; x out)

The packet source basically sends video and audio packets interleaved with undefined distance/grouping, and only the decoders can decide if the packets are video or audio. For every valid packet the decoders can output none or several items.

"Packet Source" can be potentially blocking and should therefore preferrably be pushing so that I can put it on an explicit thread.The problem is that the decoders can have for every input token x amount of output tokens (including zero), is that even possible to achieve with tbb flow_graph? Taking the problem an additional step, if several output tokens can be generated from a single input token, I would only like to compute the extra output tokens if there is a "poll" on the decoder or if the decoder can "push" into the output (in ms concrt and agents this is achieved by calling "send" for every output token).

Furthermore, even though e.g. the audio-decoder has an item ready for the join, the video-decoder could potentially need y amount of more packets from the source before it has a ready item (due to the undefined interleaving/grouping). Though it is important that the audio-decoder does not miss any packets (even though it alrdy has enough packets) since there could potentially be audio-packets amongst them.

Then instead of calling graph.wait_for_all() I would like to be able to "poll" the graph for items, since flow rate of the graph should be decided by the target.Even though limiting the number of tokens in the graph is good enough, it would be preferrable to have some other limit, e.g. the size of all existing tokens in the graph.

I hoped I could get some advice and direction on possible solutions.

www.casparcg.com
7 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

Hello, nagy,

I think you are describing three problems (at least): how to use flow::graph with ablocking source, how to optionally emit one or more items from one graph node to another, and how to limit parallelism in the graph.

The first is a simple problem. If you have a blocking thread, it can explicitly put (or try_put) items to a node of the graph. So if the first node of your graph is n1, in a threadyou can do

for(;;) {


 n1.try_put(b);
}

The second problem is interesting. In the programs I have been writing using flow::graph, I have found the multioutput_function_node is the one which lets me be the most flexible with output. (The multioutput_function_node is a preview feature, so the name may change.)

The MOFN resembles a function_node in that it has a body that it isexecuted when an input is received. The interesting aspect is that the body is passed a reference to that input, and a reference to a tuple of output ports. The body can try_put anything it likes to any of the ports (subject to type-consistency), or it can choose not to emit anything.

The types of the output ports are defined by an std::tuple< types >. The example below uses dynamic casts for deciding which kind of buffer we have.

class buffer { 
...
};

class audio_buffer : public buffer {
...
};

class video_buffer : public buffer {
};

typedef multioutput_function_node >input_handler_type;
#define AUDIO_HANDLER 0
#define VIDEO_HANDLER 1

class mofn_body { 
public:
 void operator()(const buffer & in, input_handler_type::output_ports_type &out) {
 audio_buffer *ab;
 video_buffer*vb;
 if(ab = dynamic_cast(&in))std::get(out).try_put(*ab);
 else if(vb = dynamic_cast(&in)) std::get(out).try_put(*vb);
 else {

 }
 }
};

I use #defines to name the tuple elements (AUDIO_HANDLER for instance) because it reads better. (I haven't compiled the code I just typed in; I think I have the syntax mostly right.)

The third problem (limiting concurrency) can be done a couple ways: you can limit the concurrency of a node in the graph at construction time, or you can use some resource to limit concurrency. For instance, if you have N buffers, you won't get more than a concurrency of N. Managing the buffers can be done with a queue_node. The best method would depend on how the application is structured.

I've been experimenting somewhat with graph, which is a bit of a challenge with g++ 4.4.3 (on Ubuntu LTS) and its partial-only C++11 support (tuples!). I should probably free up some disk space and install Intel's compiler for that...

The first problem is not so simple with just a non-blocking API: the user will need to loop with a backoff strategy, possibly by risking to become dependent on internal TBB code that I will not expose here. :-)

The second problem misses the mark because it is only the handlers themselves that can determine what kind of packet it is, so the packet will have to be broadcast first. I have to say that I have my doubts about making this determination in parallel, which would seem to add unnecessary overhead as opposed to splitting up the handlers so that the choice can indeed be made by one node that can then send the packet one way or the other, or choose between continuing work or delegating it. Of course this may trade extra latency on the critical path (sic?) for better throughput... right?

The second-and-a-halfth problem is not addressed here: how to change the cardinality of items flowing through the graph?

I have no opinion yet on the third problem.

The third-and-a-halfth problem is not addressed here: how to poll while the graph is running? It probably mirrors the first problem.

(Added) The documentation seems a bit deficient in describing, e.g., how to concoct one's own node from abstract classes sender and receiver to allow such a change in cardinality, without independent additional analysis.

Thanks for the replies. I will need some time to get though your suggestions.

www.casparcg.com

Hello, Raf,

Yes, the TBB support for tuples is rudimentary if the compiler doesn't include full C++11 tuple support.

I am sorry I didn't specify the node being put-to by the blocking thread; it should be a queue node, another graph node with a built-in queue, or a parallel node; otherwise yes, one would have to check the status returned by the try_put and re-send if it failed.

I may misunderstand the point of the third paragraph. Nagy's question seemed to involve splitting up the handler into a "determine what kind of node and forward to the appropriate processing node" andthe actual handlers. That is what I hoped I had described. (The multioutput_function_node is the "decide and route" part.)

I am not sure if I understand your meaning of "cardinality". Does it mean replication of items and forwarding to multiple nodes? This can also be done by a MOFN. It can send the same item to more than one successor.

If my understanding of "cardinality" is correct, it is possible to change the number of items on-the-fly with a MOFN, too. (If you have a MOFN that accepts type A and has a one-element tuple describing the output std::tuple< A >, the body of the MOFN can repeatedly output A as many times as necessary to create the number of duplicates desired. Or output nothing at all to absorb items and not forward them.)

Thanks for the response, Raf. I hope I didn't misunderstand too much (and I'd appreciate help understanding if I misconstrued.)

Regards,
Chris

"Yes, the TBB support for tuples is rudimentary if the compiler doesn't include full C++11 tuple support."
Curiously the biggest obstacle went away after I made a local copy of a const reference to a tuple:

struct merge_body_t {
    packet_t operator()(const std::tuple & arg) {
        packet_t l_result;
        std::tuple l_arg = arg; // TODO: why is this intermediate copy needed with g++ 4.4.3?
        packet_t p0 (l_arg.get<0>());
        packet_t p1 = l_arg.get<1>();
        l_result.m_name = p0.m_name + " " + p1.m_name;
        std::cout << "Merge " << l_result.m_name << std::endl ;
        return l_result;
    }
};

"I may misunderstand the point of the third paragraph. Nagy's question seemed to involve splitting up the handler into a "determine what kind of node and forward to the appropriate processing node" and the actual handlers. That is what I hoped I had described. (The multioutput_function_node is the "decide and route" part.)"
Both approaches may work, but I was going with "only the decoders can decide if the packets are video or audio".

"I am not sure if I understand your meaning of "cardinality". Does it mean replication of items and forwarding to multiple nodes? This can also be done by a MOFN. It can send the same item to more than one successor."
Maybe I'm using the wrong word, but I meant generating multiple outgoing messages from one incoming message, or the other way around, even in a node with a single incoming edge and a single outgoing edge. I'm having some trouble figuring out how to use the various interfaces because the documentation tends to describe only what they do, not how they are supposed to be used and overridden etc. But maybe I should just slow down and read it again, more carefully.

"If my understanding of "cardinality" is correct, it is possible to change the number of items on-the-fly with a MOFN, too. (If you have a MOFN that accepts type A and has a one-element tuple describing the output std::tuple< A >, the body of the MOFN can repeatedly output A as many times as necessary to create the number of duplicates desired. Or output nothing at all to absorb items and not forward them.)"
That added functionality could be helpful, or just an alternative to a more direct implementation on top of basic classes, I don't know yet.

"If my understanding of "cardinality" is correct, it is possible to
change the number of items on-the-fly with a MOFN, too. (If you have a
MOFN that accepts type A and has a one-element tuple describing the
output std::tuple< A >, the body of the MOFN can repeatedly output
A as many times as necessary to create the number of duplicates
desired. Or output nothing at all to absorb items and not forward
them.)"
It would appear that this functionality has to be presented for the user to be able to output multiple packets for each input packet without substantial reinvention of the wheel. But this does not appear to address the complementary problem of multiple input packets required (1), and there may not even be identifiable times where enough input has been assembled that all the output can be generated and buffered (2). Anyway, even if (1) is addressed without (2), unless the determination can be made externally to the user-provided body code, in the form of a simple number or so, it would appear that the node should have identifiable state, for performance even if the node has access to the input queue (1), or to work at all (2).

It would appear that flow graphs are not yet ready for all uses, this being one of them.

Would it be possible to open up the inner workings so that the user can create such functionality? This would require some good technical (re)writing on the documentation side, as well!

(Added) It would appear I'm weaseling. :-)

Leave a Comment

Please sign in to add a comment. Not a member? Join today