Intel® Threading Building Blocks flow graph: using streaming_node

The Intel® Threading Building Blocks (Intel® TBB) library provides a set of algorithms that enable parallelism in C++ applications. Since Intel® TBB 4.0, unstructured parallelism, dependency graphs and data flow algorithms can be expressed with flow graph classes and functions. The flow graph interface makes Intel® TBB useful for cases that are not covered by its generic parallel algorithms, while keeping users away from lower-level peculiarities of its tasking API.

Increasingly, systems are becoming heterogeneous and are starting to incorporate not only the power of CPUs but also different kinds of accelerators that are suitable for particular sets of tasks.

In an effort to better support heterogeneous solutions, async_node was added to the flow graph API to support parallel activities, external working threads (threads that are not in TBB thread pool), etc. The main limitation of async_node is that the result is returned to the graph at the same point. You cannot start an async_node activity in one place and return the async_node result at another point of the graph.

The problem described above can be resolved with another new Intel® TBB feature: async_msg. This concept is quite similar to the future/promise concept, a standard C++ feature, and it allows the result to be returned to the graph at any point. You just need to pass the async message from the node where the async activity was started to the node where the async result is needed.

Moreover, Intel® TBB provides a special node with OpenCL support in it: opencl_node. The details can be found here: https://software.intel.com/en-us/blogs/2015/12/09/opencl-node-overview.

During the implementation of the node, we found that some concepts are quite generic and can be used for a range of heterogeneous APIs. For example, async_msg was developed as an implementation of the postponed asynchronous result concept for the Intel TBB flow graph. Another generic heterogeneous concept was implemented in the streaming_node class, which is described below.

streaming_node main ideas & workflow

As we look at typical asynchronous and/or heterogeneous usage models, we can find that the models usually include the following steps:

  • Receive input data.
  • Select a device for the kernel execution.
  • Send the kernel arguments to the device.
  • Enqueue the kernel for execution on the device.
  • Get future result handlers from the device and store them somehow.
  • Send a future result object (async_msgs in fact) to the next graph node.

The workflow looks quite generic and independent of the particular device API. In Intel® TBB, the schema was implemented in the streaming_node class. However, the schema is quite abstract and so to make it usable, we need to select a particular device API. In Intel® TBB, we encapsulate the specifics of the device APIs in Factories. We tried to make the Factory concept as simple as possible.

Let us look at the steps above from an areas of responsibility point of view. Some steps can be implemented by streaming_node itself, some through a user-defined functionality, and some through the Factory concept (an abstraction of a device API).

Step

Owner of responsibility

Receive Input datastreaming_node
Select device for kernel executionuser (through user-provided functor)
Send kernel arguments to devicestreaming_node calls factory::send_data and receives dependency handlers back

Enqueue kernel to device,

get and store future result handler

streaming_node calls factory::send_kernel and receives dependency handlers back
Send future result to next nodestreaming_node creates async_msg that includes dependency handlers

The main streaming_node workflow becomes clear from the table above.

Please note that dependency handlers are device API-specific, so only the Factory can know the particular dependency type. In the current implementation, async_msg class cannot store any additional dependencies, so the Factory must provide a dependency_msg class derived from async_msg. As a result, an additional requirement for the Factory concept is that it provides the Factory::async_msg_type type. In addition, the main Factory interfaces must be able to get and update (to store dependencies) Factory::async_msg_type objects:

Factory::send_data  (device_type device, Factory::async_msg_type& dependencies[ ])Factory::send_kernel  (device_type device, kernel_type kernel, Factory::async_msg_type& dependencies[ ])

Hello, World!

Let us try to implement asynchronous “Hello World” printing with the streaming_node.

We will use a C++ thread in place of a programmable device.

The following classes and functions are needed to implement it:

  1. A special, taliored for this case asynchronous message (derived from async_msg)
  2. A thread with parallel printing in it (our “device”).
  3. A Factory that can work with the “device”.
  4. A simple device_selector.
  5. A main() function with 2 nodes.

Let us implement the components one by one:

hello_world.cpp:  part 1: user_async_msg class
#include <iostream>
#include <thread>
#include <mutex>
#include <cassert>
#include <tuple>

#define TBB_PREVIEW_FLOW_GRAPH_NODES 1
#define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1

#include "tbb/tbb_config.h"
#include "tbb/concurrent_queue.h"
#include "tbb/flow_graph.h"

template<typename T>
class user_async_msg : public tbb::flow::async_msg<T>
{
public:
    typedef tbb::flow::async_msg<T> base;
    user_async_msg() : base() {}
    user_async_msg(const T& input) : base(), mInputData(input) {}
    const T& getInput() const { return mInputData; }

private:
    T mInputData;
};        

 

In the listing there are a few standard includes as well as several Intel TBB flow graph includes and definitions that enable async_msg and streaming_node classes in the Intel TBB headers.

 

The user_async_msg class is quite trivial: it just adds the mInputData field to store the original input value for processing in the asynchronous thread.

hello_world.cpp:  part 2: user_async_activity class
class user_async_activity { // Async activity singleton
public:
    static user_async_activity* instance() {
        if (s_Activity == NULL) {
            s_Activity = new user_async_activity();
        }
        return s_Activity;
    }

    static void destroy() {
        assert(s_Activity != NULL && "destroyed twice");
        s_Activity->myQueue.push(my_task()); // Finishing queue
        s_Activity->myThread.join();
        delete s_Activity;
        s_Activity = NULL;
    }

    void addWork(const user_async_msg<std::string>& msg) {
        myQueue.push(my_task(msg));
    }

private:
    struct my_task {
        my_task(bool finish = true)
            : myFinishFlag(finish) {}

        my_task(const user_async_msg<std::string>& msg)
            : myMsg(msg), myFinishFlag(false) {}

        user_async_msg<std::string> myMsg;
        bool                        myFinishFlag;
    };

    static void threadFunc(user_async_activity* activity) {
        my_task work;
        for(;;) {
            activity->myQueue.pop(work);
            if (work.myFinishFlag)
                break;
            else {
                std::cout << work.myMsg.getInput() << ' ';
                work.myMsg.set("printed: " + work.myMsg.getInput());
            }
        }
    }

    user_async_activity() : myThread(&user_async_activity::threadFunc, this) {}
private:
    tbb::concurrent_bounded_queue<my_task>  myQueue;
    std::thread                             myThread;
    static user_async_activity*             s_Activity;
};

user_async_activity* user_async_activity::s_Activity = NULL;

The user_async_activity class is a typical singleton with two common static interfaces: instance() and destroy().

The class wraps a standard thread (we used the std::thread class), which processes tasks from a task queue (implemented via the tbb::concurrent_bounded_queue  class).

Any thread can add a new task to the queue via the addWork() method, while the worker thread is processing the tasks one by one. For every incoming task, it just prints the original input string to the console and uses the async_msg::set interface to return the result back to the graph. The following pseudocode shows the format of the result: Result = ‘printed: ’ | original string, where “|” represents string concatenation.

hello_world.cpp:  part 3: device_factory class
class device_factory {
public:
    typedef int device_type;
    typedef int kernel_type;

    template<typename T> using async_msg_type = user_async_msg<T>;
    template <typename ...Args>
    void send_data(device_type /*device*/, Args&... /*args*/) {}

    template <typename ...Args>
    void send_kernel(device_type /*device*/, const kernel_type& /*kernel*/, Args&... args) {
        process_arg_list(args...);
    }

    template <typename FinalizeFn, typename ...Args>
    void finalize(device_type /*device*/, FinalizeFn /*fn*/, Args&... /*args*/) {}

private:
    template <typename T, typename ...Rest>
    void process_arg_list(T& arg, Rest&... args) {
        process_one_arg(arg);
        process_arg_list(args...);
    }

    void process_arg_list() {}

    // Retrieve values from async_msg objects

    template <typename T>
    void process_one_arg(async_msg_type<T>& msg) {
        user_async_activity::instance()->addWork(msg);
    }

    template <typename ...Args>
    void process_one_arg(Args&... /*args*/) {}
};

In this example, the implementation of an asynchronous device factory is simple; in fact, it provides a non-trivial definition for only one factory method:  send_kernel. The method gets incoming async messages as a C++ variadic template. As a result, in the implementation we just need to get all messages from the list and put them into the addWork() interface of our asynchronous activity.

Moreover, the Factory provides the correct async_msg_type for streaming_node, trivial (unused here) types for the device and the kernel, and empty implementations for the expected (but unused here) methods send_data and finalize. In your implementation, you can implement send_data to upload data to the device before the kernel run. Additionally, if the next node in the graph can reject incoming messages from streaming_node, the Factory must implement the finalize() method that calls the provided finalization functor by a finish callback from the device.

With all of the above in mind, the Factory concept can be implemented in several dozen lines of code in simple cases.

hello_world.cpp:  part 4: device_selector class
template<typename Factory>
class device_selector {
public:
    typename Factory::device_type operator()(Factory&) { return 0; }
};

In this simple example we have just one device, so the device selector functor is also trivial.

hello_world.cpp:  part 5: main()
int main() {
    using namespace tbb::flow;
    typedef streaming_node< tuple<std::string>, queueing, device_factory > streaming_node_type;

    graph g;
    device_factory factory;
    device_selector<device_factory> device_selector;
    streaming_node_type node(g, 0 /*kernel*/, device_selector, factory);
    std::string final;
    std::mutex final_mutex;

    function_node< std::string > destination(g, unlimited, [&g, &final, &final_mutex](const std::string& result) {
        std::lock_guard<std::mutex> lock(final_mutex);
        final += result + "; "; // Parallel access
        g.decrement_wait_count();
    });

    make_edge(output_port<0>(node), destination);
    g.increment_wait_count(); // Wait for result processing in 'destination' node
    input_port<0>(node).try_put("hello");
    g.increment_wait_count(); // Wait for result processing in 'destination' node
    input_port<0>(node).try_put("world");

    g.wait_for_all();
    user_async_activity::destroy();

    std::cout << std::endl << "done" << std::endl << final << std::endl;
    return 0;
}

In the main() function we create all the required components: a graph object, a factory object, a device selector, and 2 nodes: one streaming_node and one destination function_node, which processes asynchronous results. make_edge() is used to connect these 2 nodes together. By default, the flow graph knows nothing about our async activity and it will not wait for the results. That is why manual synchronization (via increment_wait_count() / decrement_wait_count())was implemented. After the end the execution of the graph, the worker thread can be stopped, and the final log string is printed.

The application output:

$ g++ -std=c++11 -I$TBB_INCLUDE -L$TBB_LIB -ltbb -o hello ./hello_world.cpp
$ ./hello
hello world
done
printed: hello; printed: world;

Note: the code needs C++11 support, so the key -std=c++0x must be used for compilation.

Conclusion

This article demonstrates how to implement a simple Factory that works with streaming_node – a new flow graph node in the Intel TBB library. The detailed description of streaming_node can be found in the Intel TBB documentation (see Intel® Threading Building Blocks Developer Reference -> Appendices -> Preview Features -> Flow Graph -> streaming_node Template Class).

Note that this functionality is provided for preview and is subject to change, including incompatible modifications in the API and behavior.

 

If you have any remarks and suggestions about the article, feel free to leave comments. 

 

For more complete information about compiler optimizations, see our Optimization Notice.