join_node Template Class

Summary

A node that creates a tuple<T0,T1, ... > from a set of messages received at its input ports and broadcasts the tuple to all of its successors. The class join_node supports three buffering policies at its input ports: reserving, queueing and tag_matching. By default, join_node input ports use the queueing policy.

Syntax

template<typename OutputTuple, graph_buffer_policy JP = queueing>
class join_node;

Header

#include "tbb/flow_graph.h"

Description

A join_node is a graph_node and a sender< flow::tuple< T0, T1, ... >>. It contains a tuple of input ports, each of which is a receiver<Ti> for each of the T0 .. TN in OutputTuple. It supports multiple input receivers with distinct types and broadcasts a tuple of received messages to all of its successors. All input ports of a join_node must use the same buffering policy. The behavior of a join_node based on its buffering policy is shown in the table below.

Behavior of a join_node based on the buffering policy of its input ports.

Buffering Policy

Behavior

queueing

As each input port is put to, the incoming message is added to an unbounded first-in first-out queue in the port. When there is at least one message at each input port, the join_node broadcasts a tuple containing the head of each queue to all successors. If at least one successor accepts the tuple, the head of each input port's queue is removed; otherwise, the messages remain in their respective input port queues.

reserving

As each input port is put to, the join_node marks that an input may be available at that port and returns false. When all ports have been marked as possibly available, the join_node will try to reserve a message at each port from their known predecessors. If it is unable to reserve a message at a port, it un-marks that port, and releases all previously acquired reservations. If it is able to reserve a message at all ports, it broadcasts a tuple containing these messages to all successors. If at least one successor accepts the tuple, the reservations are consumed; otherwise, they are released.

tag_matching

As each input port is put to, a user-provided function object is applied to the message to obtain its tag. The message is then added to a hash table at the input port, using the tag as the key. When there is message at each input port for a given tag, the join_node broadcasts a tuple containing the matching messages to all successors. If at least one successor accepts the tuple, the messages are removed from each input port's hash table; otherwise, the messages remain in their respective input ports.

Rejection of messages by successors of the join_node and failed gets from predecessors of the input ports are handled using the protocol described in the Message Passing Protocol.

The function template input_port simplifies the syntax for getting a reference to a specific input port.

OutputTuple must be a flow::tuple<T0,T1, ... > where each element is copy-constructible and assignable.

Example

#include<cstdio>
#include "tbb/flow_graph.h"
 
using namespace tbb::flow;
 
int main() {
   graph g;
   function_node<int,int>
       f1( g, unlimited, [](const int &i) { return 2*i; } );
   function_node<float,float>
       f2( g, unlimited, [](const float &f) { return f/2; } );
 
   join_node< tbb::flow::tuple<int,float> > j(g);
 
   function_node< flow::tuple<int,float> >
       f3( g, unlimited,
           []( const flow::tuple<int,float> &t ) {
               printf( "Result is %f\n",
                       std::get<0>(t) + std::get<1>(t));
           } );
 
   make_edge( f1, input_port<0>( j ) );
   make_edge( f2, input_port<1>( j ) );
   make_edge( j, f3 );
 
   f1.try_put( 3 );
   f2.try_put( 3 );
   g.wait_for_all( );
   return 0;
}

In the example above, three function_node objects are created: f1 multiplies an int i by 2, f2 divides a float f by 2, and f3 receives a flow::tuple<int,float> t, adds its elements together and prints the result. The join_node j combines the output of f1 and f2, and then forwards the resulting tuple to f3. This example is purely a syntactic demonstration since there is very little work in the nodes.

Members

namespace tbb {
namespace flow {
 
enum graph_buffer_policy {
    rejecting, reserving, queueing, tag_matching };
 
template<typename OutputTuple, graph_buffer_policy JP = queueing>
class join_node :
    public graph_node, public sender< OutputTuple > {
 
public:
    typedef OutputTuple output_type;
    typedef receiver<output_type> successor_type;
    typedef implementation-dependent-tuple input_ports_type;
 
    join_node( graph &g );
    join_node( const join_node &src );
    input_ports_type &input_ports( );
    bool register_successor( successor_type &r );
    bool remove_successor( successor_type &r );
    bool try_get( output_type &v );
    bool try_reserve( output_type &v );
    bool try_release( );
    bool try_consume( );
 
};
 
//
// Specialization for tag_matching
//
 
template<typename OutputTuple>
class join_node<OutputTuple, tag_matching> :
    public graph_node, public sender< OutputTuple > {
 
public:
 
    // Has the same methods as previous join_node,
    // but has constructors to specify the tag_matching
    // function objects
 
    template<typename B0, typename B1>
    join_node( graph &g, B0 b0, B1 b1 );
   
    // Constructors are defined similarly for
    // 3 through 10 elements ...
};
 
}
}
The following table provides additional information on the members of this template class.
Member Description
join_node( graph &g ) Constructs a join_node that will spawn tasks using the root task in g.
template < typename B0, typename B1, ... > join_node( graph &g, B0 b0, B1 b1, ... )

A constructor only available in the tag_matching specialization of join_node.

Creates a join_node that uses the function objects b0, b1, ... , bN to determine that tags for the input ports 0 through N. It will spawn tasks using the root task in g.

Caution

Function objects passed to the join must not throw.

join_node( const join_node &src )

Creates a join_node that has the same initial state that src had at its construction. The list of predecessors, messages in the input ports, and successors are NOT copied.

input_ports_type &input_ports( )

Returns: a flow::tuple of receivers. Each element inherits from tbb::receiver<T> where T is the type of message expected at that input. Each tuple element can be used like any other flow::receiver<T>. The behavior of the ports based on the selected join_node policy is shown in the table above.

bool register_successor( successor_type &r )

Adds r to the set of successors.

Returns: true

bool remove_successor( successor_type &r )

Removes r from the set of successors.

Returns: true

bool try_get( output_type &v )

Attempts to generate a tuple based on the buffering policy of the join_node.

Returns: If it can successully generate a tuple, it copies it to v and returns true. Otherwise it returns false.

bool try_reserve( output_type &v )

Does not support reservations.

Returns: false.

bool try_release( )

Does not support reservations.

Returns: false.

bool try_consume( )

Does not support reservations.

Returns: false.

template<size_t N, typename JNT> typename flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port( JNT &jn )

Calling input_port <N>( jn ) is equivalent to calling std::get<N>( jn.input_ports() )

Returns: the Nth input port for join_node jn.

For more complete information about compiler optimizations, see our Optimization Notice.