# Using the new concurrent priority queue in Intel® Threading Building Blocks

Por Terry Wilmarth, publicado el 1 de agosto de 2011

We have added a new example that illustrates the use of the concurrent priority queue in Intel® Threading Building Blocks (Intel® TBB). For details on the concurrent priority queue, see this earlier blog post.

For our example, we provide a simple parallelization of the *shortest path* problem from graph theory. This problem finds a minimally weighted path between two points. For example, consider locations on a road map. These can be represented by *vertices* in a graph. Some of the locations are directly connected by roads. These roads can be represented by *edges* in our graph. The distance along the road that directly connects two locations is an edge *weight* that can be represented by a function. A path from one location *v1* to another location *v2* is thus any sequence of edges that one can traverse to get from *v1* to *v2*. The weight of that path is the sum of the weights on the edges that comprise the path. Thus, finding the shortest path in the road map scenario tells us the shortest way to get from one location to another (without off-roading).*A** is a serial heuristic search algorithm that can be used to solve the shortest path problem. A* uses a heuristic function *f:V*->**R** (where **R** is the domain of possible edge weights) to determine the order in which to visit the vertices of the graph. This function is the sum of two other functions: *g:V*->**R** and *h:V*->**R**. *g* is the cost of the path from the source vertex to the input vertex. *h* is an admissible heuristic estimate (i.e. it must not be an overestimate) of the weight of the path from the input vertex to the goal vertex. For example, in the road map scenario where we are concerned with distance, an admissible heuristic would be the direct distance between two vertices (as if they were directly connected by a perfectly straight road). The *g* and *f* functions are derived as the algorithm proceeds.

A priority queue called *open_set* is used to sort the vertices by *f*. To find the shortest path from a source vertex to a destination vertex, the source vertex is first placed in *open_set* and its *g* and *f* values are initialized (see the code below). Then in a loop, we remove a vertex *u* from the *open_set* to explore. Keep in mind that the *open_set* is a priority queue, so each time we remove a vertex, that vertex has the optimal *f* value encountered so far. If *u* is the destination vertex, then we are done (and we can reconstruct the path from the predecessor array we calculated). Otherwise, we place *u* in the closed set (to indicate that it has already been visited) and then examine each of its adjacent vertices. For a given vertex *v*, if *v* has already been explored (i.e. taken from the *open_set* and placed in the closed set) there is no need to revisit it. Otherwise, we calculate a new *g* value for *v* based on the fact that we are arriving at *v* from *u*. If the new *g* value is better than the old value, we update *v*’s predecessor to *u*, its *g* value to the new *g* value, and recalculate its *f* value given the new *g* value. If *v* was previously unexplored, we add it to *open_set*. If the algorithm exits the while loop without reaching the destination vertex, then there is no path from the source to the destination. `bool shortpath(source, dest)`

` open_set.push(source)`

` foreach v in V `

` g[v] := inf`

` f[v] := 0`

` closed[v] := false`

` end foreach`

` g[source] := 0.0 `

` f[source] := h(source) `

` while (!open_set.empty())`

` u := open_set.pop()`

` if (u = dest) return true`

` closed[u] := true`

` foreach v in edges[u] `

` if (closed[v]) continue`

` new_g_v := g[u] + edge_weight(u, v)`

` if (new_g_v < g[v])`

` f[v] := new_g_v + h(v)`

` if (g[v] = inf) open_set.push(v)`

` predecessor[v] := u`

` g[v] := new_g_v`

` end if`

` end foreach`

` end while`

` return false`

`end shortpath`

Note that for the optimal performance of this algorithm, the priority queue must allow for dynamic update of the priority values (here, *f*) of its contents, since the priority of an element may change after it has been placed in the queue.

To parallelize this algorithm, we make a few minor changes. First, we allow multiple threads to remove vertices from the *open_set* and explore them concurrently. Since our concurrent priority queue does not support the expensive priority update operation, we simply allow duplicate insertions of the same vertex, but with different priority values. When lower priority vertices are re-explored, we check if they have a less optimal *f* value than what is currently recorded, and if so, they are not explored. This approach removes the need for keeping track of what vertices have been visited via a closed set.

Below, we show an excerpt of our *shortpath* example, an implementation of the parallel version of this algorithm with Intel® TBB. As you can see, the parallel version makes use of a concurrent priority queue for *open_set*, since the calls to *push* and *try_pop* may happen concurrently from different threads. Reads and writes on the statuses of vertices are handled in critical sections controlled by a `tbb::spin_mutex`

. This is a particularly good use of *spin_mutex* because the lock is restricted to a single vertex in the graph, so there are potentially many locks but each has light contention. Thus we expect to get the lock quickly, so spinning is far more efficient than putting the thread to sleep. The algorithm also makes use of tasking, via a task group, to trigger the exploration of other vertices on additional threads. A given task spawns just enough new tasks to handle the additions to *open_set*, limiting the number of spawned tasks to *max_spawn*, which is a function of the graph size. We limit the number of spawned tasks because it is more efficient for a task to keep looping and handling available work than it is to create new tasks, once the number of tasks has reached the available parallelism. However, since we do not know the available parallelism, we simply specify a *grainsize* in terms of number of vertices for each task to handle and divide the total number of vertices by that *grainsize* to get *max_spawn*. This *grainsize* can then be adjusted. Decreasing it creates more tasks so full use can be made of available parallelism; increasing it creates fewer tasks, but has less task creation overhead. If the available parallelism is known in advance, it can be adjusted to create exactly as many tasks as there are threads. The number of spawned tasks is maintained in a `tbb::atomic`

variable *num_spawn*. This algorithm can also be used with a concurrent queue, since it can potentially be used to exhaustively examine all possibilities, but using a concurrent priority queue is more efficient because better paths are explored earlier, causing more suboptimal paths to be pruned. `struct point {`

` double x, y;`

` point() {}`

` point(double _x, double _y) : x(_x), y(_y) {}`

` point(const point& p) : x(p.x), y(p.y) {} `

`};`

`typedef vector < point > point_set;`

`typedef size_t vertex_id;`

`typedef std::pair < vertex_id, double > vertex_rec;`

`typedef vector < vector < vertex_id > > edge_set;`

`size_t N; // number of vertices`

`size_t src; // start of path`

`size_t dst; // end of path`

`const double INF=100000.0; // infinity, used to init f and g`

`size_t grainsize; // number of vertices per task on average`

`size_t max_spawn; // max tasks to spawn`

`atomic < size_t > num_spawn; // number of active tasks`

`point_set vertices; // vertices`

`edge_set edges; // edges`

`vector < vertex_id > predecessor; // for recreating path from src to dst`

`vector < double > f_distance; // estimated distances at particular vertex; init to INF`

`vector < double > g_distance; // current shortest distances from src vertex; init to INF`

`vector < spin_mutex > locks; // a lock for each vertex`

`task_group *sp_group; // task group for tasks executing sub-problems`

`class compare_f {`

`public:`

` bool operator()(const vertex_rec& u, const vertex_rec& v) const {`

` return u.second>v.second;`

` }`

`};`

`concurrent_priority_queue < vertex_rec, compare_f > open_set; // tentative vertices`

`void shortpath() {`

` g_distance[src] = 0.0; `

` f_distance[src] = get_distance(vertices[src], vertices[dst]);`

` open_set.push(make_pair(src,f_distance[src])); `

` sp_group->run([]{ shortpath_helper(); });`

` sp_group->wait();`

`}`

`void shortpath_helper() {`

` vertex_rec u_rec;`

` while (open_set.try_pop(u_rec)) {`

` vertex_id u = u_rec.first;`

` if (u==dst) continue;`

` double f = u_rec.second;`

` double old_g_u;`

` {`

` Spin_mutex::scoped_lock l(locks[u]);`

` if (f > f_distance[u]) continue; // prune search space`

` old_g_u = g_distance[u];`

` }`

` for (size_t i=0; i < edges[u].size(); ++i) {`

` vertex_id v = edges[u][i];`

` double new_g_v = old_g_u + get_distance(vertices[u], vertices[v]);`

` double new_f_v;`

` // “push” lets us move some work out of the critical section below`

` bool push = false;`

` {`

` spin_mutex::scoped_lock l(locks[v]);`

` if (new_g_v < g_distance[v]) {`

` predecessor[v] = u;`

` g_distance[v] = new_g_v;`

` new_f_v = f_distance[v] = g_distance[v] + `

` get_distance(vertices[v], vertices[dst]);`

` push = true;`

` }`

` }`

` if (push) {`

` open_set.push(make_pair(v,new_f_v));`

` size_t n_spawn = ++num_spawn;`

` if (n_spawn < max_spawn) {`

` sp_group->run([]{ shortpath_helper(); }); `

` }`

` else –-num_spawn;`

` }`

` } `

` }`

` --num_spawn;`

`}`