Data Flow Graph

In a data flow graph, nodes are computations that send and receive data messages. Some nodes may only send messages, others may only receive messages, and others may send messages in response to messages that they receive.

In the following data flow graph, the left-most node generates the integer values from 1 to 10 and passes them to two successor nodes. One of the successors squares each value it receives and passes the result downstream. The second successor cubes each value it receives and passes the result downstream. The right-most node receives values from both of the middle nodes. As it receives each value, it adds it to a running sum of values. When the application is run to completion, the value of sum will be equal to the sum of the sequence of squares and cubes from 1 to 10.

Simple Data Flow Graph
Simple data flow graph

The following code snippet shows an implementation of the Simple Data Flow Graph shown above:

    int sum = 0;
    graph g;
    function_node< int, int > squarer( g, unlimited, [](const int &v) { 
        return v*v; 
    } );
    function_node< int, int > cuber( g, unlimited, [](const int &v) { 
        return v*v*v; 
    } );
    function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 
        return sum += v; 
    } );
    make_edge( squarer, summer );
    make_edge( cuber, summer );

    for ( int i = 1; i <= 10; ++i ) {
      squarer.try_put(i);
      cuber.try_put(i);
    }
    g.wait_for_all();

    cout << "Sum is " << sum << "\n";

In the implementation above, the following function_nodes are created:

  • one to square values
  • one to cube values
  • one to add values to the global sum

Since the squarer and cuber nodes are side-effect free, they are created with an unlimited concurrency. The summer node updates the sum through a reference to a global variable and therefore is not safe to execute in parallel. It is therefore created with a concurrency limit of 1. The node F from Simple Data Flow Graph above is implemented as a loop that puts messages to both the squarer and cuber node.

A slight improvement over the first implementation is to introduce an additional node type, a broadcast_node. A broadcast_node broadcasts any message it receives to all of its successors.

This enables replacing the two try_puts in the loop with a single try_put:

    broadcast_node<int> b(g);
    make_edge( b, squarer );
    make_edge( b, cuber );
    for ( int i = 1; i <= 10; ++i ) {
      b.try_put(i);
    }
    g.wait_for_all();

An even better option, which will make the implementation even more like the Simple Data Flow Graph above, is to introduce a source_node. A source_node, as the name implies only sends messages and does not receive messages. Its constructor takes three arguments:

template< typename Body > source_node( graph &g, Body body, bool is_active=true)
Parameter Description
g

The graph the node belongs to.

body

the body of the source node.

is_active

This argument sets whether the node will begin sending messages immediately after a successor is attached to it, or if it must wait to be activated.

The body is a function object, or lambda expression, that contains a function operator:

bool Body::operator()(OutputType &v );

The runtime library will repeatedly invoke a source_node body until it returns false. You can replace the loop in the example with a source_node:

    source_node< int > src( g, src_body(10), false );
    make_edge( src, squarer );
    make_edge( src, cuber );
    src.activate();
    g.wait_for_all();

The runtime library will repeatedly invoke the function operator in src_body until it returns false. You therefore need to create body that will act like the body of the loop in the Simple Data Flow Graph above. The final implementation after all of these changes is shown below:

    class src_body {
        const int my_limit;
        int my_next_value;
    public:
        src_body(int l) : my_limit(l), my_next_value(1) {}
        bool operator()( int &v ) {
            if ( my_next_value <= my_limit ) {
                v = my_next_value++;
                return true;
            } else {
                return false;
            }
        }
    };

    int main() {
      int sum = 0;
      graph g;
      function_node< int, int > squarer( g, unlimited, [](const int &v) { 
          return v*v; 
      } );
      function_node< int, int > cuber( g, unlimited, [](const int &v) { 
          return v*v*v; 
      } );
      function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 
          return sum += v; 
      } );
      make_edge( squarer, summer );
      make_edge( cuber, summer );
      source_node< int > src( g, src_body(10), false );
      make_edge( src, squarer );
      make_edge( src, cuber );
      src.activate();
      g.wait_for_all();
      cout << "Sum is " << sum << "\n";
    }

This final implementation has all of the nodes and edges from the Simple Data Flow Graph above. In this simple example, there is not much advantage in using a source_node over an explicit loop. But, because a source_node is able to react to the behavior of downstream nodes, it can limit memory use in more complex graphs. For more information, see Creating a Token-Based System

For more complete information about compiler optimizations, see our Optimization Notice.