In this post, I’m going to describe the hybrid push-pull protocol used by Intel® Threading Building Blocks graph Community Preview Feature. You can find other graph-related posts here, including an introduction and some examples. You can download the open-source version of Intel® TBB at http://www.threadingbuildingblocks.org and are encouraged to provide feedback about the graph via the forum.
The hybrid push-pull protocol used by tbb::graph biases communication to prevent polling and to reduce unnecessary retries. Understanding the details of this protocol is not necessary to use tbb::graph, but it makes understanding its performance easier.
Nodes in a graph are persistent and exist until a user explicitly destroys them. But unlike some actor systems, a thread is not assigned to each tbb::graph node. Tasks are created on-demand to execute node bodies and pass messages between nodes when there is activity in the graph. Consequently, a tbb::graph node does not spin in a loop waiting for messages to arrive. Instead when a message arrives, a task is created to apply the receiving node’s body to the incoming message.
If nodes always accept incoming messages, this is straightforward to implement. Each time a message arrives at a node, a task can be created to apply the body to that message and to forward the result to the node’s successors. However some nodes, such as function_nodes or limiter_nodes, can reject an incoming message. For example, a function_node will reject an incoming message if it has reached its maximum allowable concurrency.
The challenge is to create a protocol for dealing with message rejection that is efficient and ensures that messages aren’t accidentally dropped. To be efficient in a non-preemptive tasking system like Intel® Threading Building Blocks, it’s important to not create many small tasks or tasks that waste resources by spinning. Creating a new task to retry at each rejection may generate many small useless tasks, and repeatedly issuing sends in a loop until one is accepted is likewise inefficient.
Instead, tbb::graph uses a hybrid push-pull protocol as a more efficient alternative.
Figure 1: A state diagram for the hybrid push-pull protocol used by tbb::graph.
In the state diagram shown in Figure 1, edges dynamically switch between a push and pull protocol at rejections. An Intel® TBB graph G = ( V, S, L ), where V is the set of nodes, S is the set of edges that are currently using a push protocol, and L is the set of edges that are currently using a pull protocol. For each edge (Vi, Vj), Vi is the predecessor / sender and Vj is the successor / receiver. When in the push set S, messages over an edge are initiated by the sender, which tries to put to the receiver. When in the pull set, messages are initiated by the receiver, which tries to get from the sender. If a message attempt across an edge fails, the edge is moved to the other set. For example, if a put across the edge (Vi, Vj) is rejected, the edge is removed from the push set S and placed in the pull set L.
This protocol results in a reduction in the messages across an edge, while maintaining quick response times. If a sender, Vi, generates data at a faster rate than its successor Vj, the edge will transition into pull mode, eliminating the many rejections that Vi would see if it were to continue to send. Likewise, if a receiver Vj processes data faster than its sender Vi, the edge will stay in push mode, allowing Vi to send data as soon as it is generated.
There are two interesting scenarios in a tbb::graph program when considering this protocol: (1) when there is no buffering between the sender and receiver and (2) when there is buffering between the sender and receiver. Using tbb::graph, there are two ways that one can deal with a node that rejects messages. If no buffering is placed before the node, rejected messages will be dropped. If buffering is placed before the receiver, then messages will be buffered until the receiver can consume them. The protocol in Figure 1 works in both cases to reduce unnecessary rejections.
Let’s first consider the unbuffered case, as shown in Figure 2 below.
Figure 2: A simple sub-graph with two function_nodes.
In Figure 2, we have a function_node, f1, sending its output to a function_node, f2. Node f1 has unlimited concurrency, which allows its body to be applied concurrently to multiple inputs. Node f2, however, is restricted to a concurrency of 1. If f2 is busy applying its body to another message when a new message arrives from f1, it rejects the incoming message. Without a buffer between the two nodes, the message is dropped. Admittedly, it is rare that a user will want messages to be dropped. In fact, in an upcoming update of the graph API, buffering will be added by default to the input of a function_node if it has limited concurrency. After that update, users will have to explicitly choose the dropping behavior.
In any case, let’s say that f2 in Figure 2 rejects a message sent by f1. Because of the protocol in Figure 1, the edge f1->f2 is changed so that f1 will no longer put to f2 and instead f2 must pull from f1. This does not mean that f1 will not apply its body to messages as they arrive at f1, but just that it will no longer send the results of its body to f2. If f1 had other successors that did not reject the message, they would continue to receive subsequent results.
When f2 finishes executing its body, it becomes free to process new messages and tries to pull from f1. Because a function_node does not have input or output buffering, it can never produce output on demand. Thus, it always rejects attempts to pull from it. This rejection returns the edge to push mode, and subsequently f1 will again push to f2.
So what has this accomplished? If f1 and f2 are imbalanced, these switches, from push to pull and then back to push, reduce the number of failed puts sent from f1 to f2. While f2 is busy, f1 stops sending messages that will ultimately be rejected. Only when f2 becomes free is the edge re-established for pushing.
The more common case, and what will become the default in an upcoming update, is when there is buffering between the two nodes as shown in Figure 3.
Figure 3: A simple sub-graph with two function_nodes and queue between them.
In this case, f1 puts its result to q1, which always accepts. In turn, q1 attempts to forward items to f2 in first-in first-out order. If f2 rejects an item, it remains at the head of the queue in q1, until it can be successfully passed to f2. In this example, the edge f1->q1 will always stay in push mode, since q1 does not reject. However, the edge q1->f2 may change since f2 can reject if it is busy.
Again, let’s assume that at times items arrive in q1 at a faster rate than they can be consumed by f2. If f2 rejects a send from q1, the edge q1->f2 is transitioned to the pull state. Because q1 buffers rejected items, this message is not lost. When f2 finishes executing its body, it tries to pull from q1. Since there is an item buffered in q1, the pull succeeds and the item is given to f2. Since the pull was successful, the edge q1->f2 stays in pull mode. When f2 finishes execution on the item it has just pulled, it will pull again from q1. It will continue to do so, until q1 is empty and therefore rejects f2’s pull request.
As with the previous scenario, the switching of modes from push to pull reduces the number of messages across the edge. Only when f2 is available to do work does it make a request for an item from q1, thereby removing the need for q1 to periodically poll or send items speculatively.