# Using the new concurrent priority queue in Intel® Threading Building Blocks

By Terry Wilmarth (Intel), published on August 1, 2011

We have added a new example that illustrates the use of the concurrent priority queue in Intel® Threading Building Blocks (Intel® TBB). For details on the concurrent priority queue, see this earlier blog post.

For our example, we provide a simple parallelization of the *shortest path* problem from graph theory. This problem finds a minimally weighted path between two points. For example, consider locations on a road map. These can be represented by *vertices* in a graph. Some of the locations are directly connected by roads. These roads can be represented by *edges* in our graph. The distance along the road that directly connects two locations is an edge *weight* that can be represented by a function. A path from one location *v1* to another location *v2* is thus any sequence of edges that one can traverse to get from *v1* to *v2*. The weight of that path is the sum of the weights on the edges that comprise the path. Thus, finding the shortest path in the road map scenario tells us the shortest way to get from one location to another (without off-roading).*A** is a serial heuristic search algorithm that can be used to solve the shortest path problem. A* uses a heuristic function *f:V*->**R** (where **R** is the domain of possible edge weights) to determine the order in which to visit the vertices of the graph. This function is the sum of two other functions: *g:V*->**R** and *h:V*->**R**. *g* is the cost of the path from the source vertex to the input vertex. *h* is an admissible heuristic estimate (i.e. it must not be an overestimate) of the weight of the path from the input vertex to the goal vertex. For example, in the road map scenario where we are concerned with distance, an admissible heuristic would be the direct distance between two vertices (as if they were directly connected by a perfectly straight road). The *g* and *f* functions are derived as the algorithm proceeds.

A priority queue called *open_set* is used to sort the vertices by *f*. To find the shortest path from a source vertex to a destination vertex, the source vertex is first placed in *open_set* and its *g* and *f* values are initialized (see the code below). Then in a loop, we remove a vertex *u* from the *open_set* to explore. Keep in mind that the *open_set* is a priority queue, so each time we remove a vertex, that vertex has the optimal *f* value encountered so far. If *u* is the destination vertex, then we are done (and we can reconstruct the path from the predecessor array we calculated). Otherwise, we place *u* in the closed set (to indicate that it has already been visited) and then examine each of its adjacent vertices. For a given vertex *v*, if *v* has already been explored (i.e. taken from the *open_set* and placed in the closed set) there is no need to revisit it. Otherwise, we calculate a new *g* value for *v* based on the fact that we are arriving at *v* from *u*. If the new *g* value is better than the old value, we update *v*’s predecessor to *u*, its *g* value to the new *g* value, and recalculate its *f* value given the new *g* value. If *v* was previously unexplored, we add it to *open_set*. If the algorithm exits the while loop without reaching the destination vertex, then there is no path from the source to the destination. `bool shortpath(source, dest)`

` open_set.push(source)`

` foreach v in V `

` g[v] := inf`

` f[v] := 0`

` closed[v] := false`

` end foreach`

` g[source] := 0.0 `

` f[source] := h(source) `

` while (!open_set.empty())`

` u := open_set.pop()`

` if (u = dest) return true`

` closed[u] := true`

` foreach v in edges[u] `

` if (closed[v]) continue`

` new_g_v := g[u] + edge_weight(u, v)`

` if (new_g_v < g[v])`

` f[v] := new_g_v + h(v)`

` if (g[v] = inf) open_set.push(v)`

` predecessor[v] := u`

` g[v] := new_g_v`

` end if`

` end foreach`

` end while`

` return false`

`end shortpath`

Note that for the optimal performance of this algorithm, the priority queue must allow for dynamic update of the priority values (here, *f*) of its contents, since the priority of an element may change after it has been placed in the queue.

To parallelize this algorithm, we make a few minor changes. First, we allow multiple threads to remove vertices from the *open_set* and explore them concurrently. Since our concurrent priority queue does not support the expensive priority update operation, we simply allow duplicate insertions of the same vertex, but with different priority values. When lower priority vertices are re-explored, we check if they have a less optimal *f* value than what is currently recorded, and if so, they are not explored. This approach removes the need for keeping track of what vertices have been visited via a closed set.

Below, we show an excerpt of our *shortpath* example, an implementation of the parallel version of this algorithm with Intel® TBB. As you can see, the parallel version makes use of a concurrent priority queue for *open_set*, since the calls to *push* and *try_pop* may happen concurrently from different threads. Reads and writes on the statuses of vertices are handled in critical sections controlled by a `tbb::spin_mutex`

. This is a particularly good use of *spin_mutex* because the lock is restricted to a single vertex in the graph, so there are potentially many locks but each has light contention. Thus we expect to get the lock quickly, so spinning is far more efficient than putting the thread to sleep. The algorithm also makes use of tasking, via a task group, to trigger the exploration of other vertices on additional threads. A given task spawns just enough new tasks to handle the additions to *open_set*, limiting the number of spawned tasks to *max_spawn*, which is a function of the graph size. We limit the number of spawned tasks because it is more efficient for a task to keep looping and handling available work than it is to create new tasks, once the number of tasks has reached the available parallelism. However, since we do not know the available parallelism, we simply specify a *grainsize* in terms of number of vertices for each task to handle and divide the total number of vertices by that *grainsize* to get *max_spawn*. This *grainsize* can then be adjusted. Decreasing it creates more tasks so full use can be made of available parallelism; increasing it creates fewer tasks, but has less task creation overhead. If the available parallelism is known in advance, it can be adjusted to create exactly as many tasks as there are threads. The number of spawned tasks is maintained in a `tbb::atomic`

variable *num_spawn*. This algorithm can also be used with a concurrent queue, since it can potentially be used to exhaustively examine all possibilities, but using a concurrent priority queue is more efficient because better paths are explored earlier, causing more suboptimal paths to be pruned. `struct point {`

` double x, y;`

` point() {}`

` point(double _x, double _y) : x(_x), y(_y) {}`

` point(const point& p) : x(p.x), y(p.y) {} `

`};`

`typedef vector < point > point_set;`

`typedef size_t vertex_id;`

`typedef std::pair < vertex_id, double > vertex_rec;`

`typedef vector < vector < vertex_id > > edge_set;`

`size_t N; // number of vertices`

`size_t src; // start of path`

`size_t dst; // end of path`

`const double INF=100000.0; // infinity, used to init f and g`

`size_t grainsize; // number of vertices per task on average`

`size_t max_spawn; // max tasks to spawn`

`atomic < size_t > num_spawn; // number of active tasks`

`point_set vertices; // vertices`

`edge_set edges; // edges`

`vector < vertex_id > predecessor; // for recreating path from src to dst`

`vector < double > f_distance; // estimated distances at particular vertex; init to INF`

`vector < double > g_distance; // current shortest distances from src vertex; init to INF`

`vector < spin_mutex > locks; // a lock for each vertex`

`task_group *sp_group; // task group for tasks executing sub-problems`

`class compare_f {`

`public:`

` bool operator()(const vertex_rec& u, const vertex_rec& v) const {`

` return u.second>v.second;`

` }`

`};`

`concurrent_priority_queue < vertex_rec, compare_f > open_set; // tentative vertices`

`void shortpath() {`

` g_distance[src] = 0.0; `

` f_distance[src] = get_distance(vertices[src], vertices[dst]);`

` open_set.push(make_pair(src,f_distance[src])); `

` sp_group->run([]{ shortpath_helper(); });`

` sp_group->wait();`

`}`

`void shortpath_helper() {`

` vertex_rec u_rec;`

` while (open_set.try_pop(u_rec)) {`

` vertex_id u = u_rec.first;`

` if (u==dst) continue;`

` double f = u_rec.second;`

` double old_g_u;`

` {`

` Spin_mutex::scoped_lock l(locks[u]);`

` if (f > f_distance[u]) continue; // prune search space`

` old_g_u = g_distance[u];`

` }`

` for (size_t i=0; i < edges[u].size(); ++i) {`

` vertex_id v = edges[u][i];`

` double new_g_v = old_g_u + get_distance(vertices[u], vertices[v]);`

` double new_f_v;`

` // “push” lets us move some work out of the critical section below`

` bool push = false;`

` {`

` spin_mutex::scoped_lock l(locks[v]);`

` if (new_g_v < g_distance[v]) {`

` predecessor[v] = u;`

` g_distance[v] = new_g_v;`

` new_f_v = f_distance[v] = g_distance[v] + `

` get_distance(vertices[v], vertices[dst]);`

` push = true;`

` }`

` }`

` if (push) {`

` open_set.push(make_pair(v,new_f_v));`

` size_t n_spawn = ++num_spawn;`

` if (n_spawn < max_spawn) {`

` sp_group->run([]{ shortpath_helper(); }); `

` }`

` else –-num_spawn;`

` }`

` } `

` }`

` --num_spawn;`

`}`

## 4 comments

TopTerry Wilmarth (Intel) said on Aug 24,2011

Ah, there is the comment that disappeared into the ether. Yes, I had to type it all over again from memory. So you get 2 answers instead of 1 :)

Terry Wilmarth (Intel) said on Aug 24,2011

Hi Jose,

I agree, if you are not familiar with TBB, the code excerpts can be very confusing. Unfortunately, the full code is perhaps a bit too long for a blog post.

First, let me show you where you can get the code. Go to this page: http://threadingbuildingblocks.org/file.php?fid=77

Download the latest version (3.0 update 8) and you will find there an examples folder, and in that a concurrent_priority_queue folder which contains the shortpath example. The code is C++.

Second, let me describe a bit of what is happening here. The Intel(R) TBB scheduler will generate the threads for you. By default, it generates a number determined by the hardware configuration, but you can control this with the task_scheduler_init construct. The main function (in the downloaded code) illustrates the use of task_scheduler_init. The next thing to notice in the code above is that we are using the task_group construct. Above, we have a task_group called sp_group. Whenever we call sp_group->run, a task is created and given to the scheduler. The scheduler gives the tasks to threads for execution. As you can see, sp_group->run will generate many tasks inside the loops of the shortpath_helper function above. sp_group->wait will wait until all the tasks created on sp_group have completed execution.

For more information on Intel(R) TBB, the task scheduler and task_groups, please check out the Documentation tab on threadingbuildingblocks.org. Also, we have a forum for TBB that many TBB users and TBB developers read, and if you have any questions, that is the quickest way to get answers: http://software.intel.com/en-us/forums/intel-threading-building-blocks/

I hope that helps to answer your questions!

Cheers!

Terry

Anonymous said on Aug 24,2011

Hi Jose,

I agree, if you are not familiar with TBB, it does look confusing. The full example has a lot more code, too much to post in a blog.

First, let me show you where you can get the full code: the most recent version (update 8) contains the shortpath example, and you can download it from here:

http://threadingbuildingblocks.org/file.php?fid=77

After you download it, look in the examples folder for a concurrent_priority_queue folder. In there, you will find the shortpath example.

But let me also explain what to look for in the code (and yes, it is C++). The code above uses the task_group construct from TBB. The task_group above is called sp_group. Every time you see sp_group->run in the code, a task is being created. The TBB scheduler gives out the tasks to threads. When we call sp_group->wait, we wait for all the tasks that were created on sp_group to complete (and as you can see in shortpath_helper, the code may spawn many of them in those loops).

The threads are generated automatically by TBB. By default, the scheduler gives you a number of threads determined by the hardware configuration, but you can use the task_scheduler_init construct to specify a particular number of threads. The main function (in your downloaded version) for this code shows how that is used.

For more information on Intel(R) TBB and the task scheduler, as well as task_groups, check out the documentation tab on threadingbuildingblocks.org. Also, there is a forum for TBB here:

http://software.intel.com/en-us/forums/intel-threading-building-blocks/

A large number of people who use TBB as well as TBB developers read that, so any questions you have should get answered quickly.

Cheers!

Terry

jose-jesus-ambriz-meza said on Aug 23,2011

Sorry by this question by I don't know "Intel® Threading Building Blocks (Intel® TBB)" . I saw a class definition and a mutex area ( Spin_mutex ) , but when generate the threads? this code is on C++?

## Add a Comment

Sign inHave a technical question? Visit our forums. Have site or software product issues? Contact support.