# Using the new concurrent priority queue in Intel® Threading Building Blocks

We have added a new example that illustrates the use of the concurrent priority queue in Intel® Threading Building Blocks (Intel® TBB). For details on the concurrent priority queue, see this earlier blog post.

For our example, we provide a simple parallelization of the shortest path problem from graph theory. This problem finds a minimally weighted path between two points. For example, consider locations on a road map. These can be represented by vertices in a graph. Some of the locations are directly connected by roads. These roads can be represented by edges in our graph. The distance along the road that directly connects two locations is an edge weight that can be represented by a function. A path from one location v1 to another location v2 is thus any sequence of edges that one can traverse to get from v1 to v2. The weight of that path is the sum of the weights on the edges that comprise the path. Thus, finding the shortest path in the road map scenario tells us the shortest way to get from one location to another (without off-roading).

A* is a serial heuristic search algorithm that can be used to solve the shortest path problem. A* uses a heuristic function f:V->R (where R is the domain of possible edge weights) to determine the order in which to visit the vertices of the graph. This function is the sum of two other functions: g:V->R and h:V->R. g is the cost of the path from the source vertex to the input vertex. h is an admissible heuristic estimate (i.e. it must not be an overestimate) of the weight of the path from the input vertex to the goal vertex. For example, in the road map scenario where we are concerned with distance, an admissible heuristic would be the direct distance between two vertices (as if they were directly connected by a perfectly straight road). The g and f functions are derived as the algorithm proceeds.

A priority queue called open_set is used to sort the vertices by f. To find the shortest path from a source vertex to a destination vertex, the source vertex is first placed in open_set and its g and f values are initialized (see the code below). Then in a loop, we remove a vertex u from the open_set to explore. Keep in mind that the open_set is a priority queue, so each time we remove a vertex, that vertex has the optimal f value encountered so far. If u is the destination vertex, then we are done (and we can reconstruct the path from the predecessor array we calculated). Otherwise, we place u in the closed set (to indicate that it has already been visited) and then examine each of its adjacent vertices. For a given vertex v, if v has already been explored (i.e. taken from the open_set and placed in the closed set) there is no need to revisit it. Otherwise, we calculate a new g value for v based on the fact that we are arriving at v from u. If the new g value is better than the old value, we update v’s predecessor to u, its g value to the new g value, and recalculate its f value given the new g value. If v was previously unexplored, we add it to open_set. If the algorithm exits the while loop without reaching the destination vertex, then there is no path from the source to the destination.

bool shortpath(source, dest)
open_set.push(source)
foreach v in V
g[v] := inf
f[v] := 0
closed[v] := false
end foreach
g[source] := 0.0
f[source] := h(source)
while (!open_set.empty())
u := open_set.pop()
if (u = dest) return true
closed[u] := true
foreach v in edges[u]
if (closed[v]) continue
new_g_v := g[u] + edge_weight(u, v)
if (new_g_v < g[v])
f[v] := new_g_v + h(v)
if (g[v] = inf) open_set.push(v)
predecessor[v] := u
g[v] := new_g_v
end if
end foreach
end while
return false
end shortpath

Note that for the optimal performance of this algorithm, the priority queue must allow for dynamic update of the priority values (here, f) of its contents, since the priority of an element may change after it has been placed in the queue.

To parallelize this algorithm, we make a few minor changes. First, we allow multiple threads to remove vertices from the open_set and explore them concurrently. Since our concurrent priority queue does not support the expensive priority update operation, we simply allow duplicate insertions of the same vertex, but with different priority values. When lower priority vertices are re-explored, we check if they have a less optimal f value than what is currently recorded, and if so, they are not explored. This approach removes the need for keeping track of what vertices have been visited via a closed set.

struct point {
double x, y;
point() {}
point(double _x, double _y) : x(_x), y(_y) {}
point(const point& p) : x(p.x), y(p.y) {}
};
typedef vector < point > point_set;
typedef size_t vertex_id;
typedef std::pair < vertex_id, double > vertex_rec;
typedef vector < vector < vertex_id > > edge_set;

size_t N; // number of vertices
size_t src; // start of path
size_t dst; // end of path
const double INF=100000.0; // infinity, used to init f and g
size_t grainsize; // number of vertices per task on average
size_t max_spawn; // max tasks to spawn
atomic < size_t > num_spawn; // number of active tasks

point_set vertices; // vertices
edge_set edges; // edges
vector < vertex_id > predecessor; // for recreating path from src to dst

vector < double > f_distance; // estimated distances at particular vertex; init to INF
vector < double > g_distance; // current shortest distances from src vertex; init to INF
vector < spin_mutex > locks; // a lock for each vertex

class compare_f {
public:
bool operator()(const vertex_rec& u, const vertex_rec& v) const {
return u.second>v.second;
}
};

concurrent_priority_queue < vertex_rec, compare_f > open_set; // tentative vertices

void shortpath() {
g_distance[src] = 0.0;
f_distance[src] = get_distance(vertices[src], vertices[dst]);
open_set.push(make_pair(src,f_distance[src]));
sp_group->run([]{ shortpath_helper(); });
sp_group->wait();
}

void shortpath_helper() {
vertex_rec u_rec;
while (open_set.try_pop(u_rec)) {
vertex_id u = u_rec.first;
if (u==dst) continue;
double f = u_rec.second;
double old_g_u;
{
Spin_mutex::scoped_lock l(locks[u]);
if (f > f_distance[u]) continue; // prune search space
old_g_u = g_distance[u];
}
for (size_t i=0; i < edges[u].size(); ++i) {
vertex_id v = edges[u][i];
double new_g_v = old_g_u + get_distance(vertices[u], vertices[v]);
double new_f_v;
// “push” lets us move some work out of the critical section below
bool push = false;
{
spin_mutex::scoped_lock l(locks[v]);
if (new_g_v < g_distance[v]) {
predecessor[v] = u;
g_distance[v] = new_g_v;
new_f_v = f_distance[v] = g_distance[v] +
get_distance(vertices[v], vertices[dst]);
push = true;
}
}
if (push) {
open_set.push(make_pair(v,new_f_v));
size_t n_spawn = ++num_spawn;
if (n_spawn < max_spawn) {
sp_group->run([]{ shortpath_helper(); });
}
else –-num_spawn;
}
}
}
--num_spawn;
}

For more complete information about compiler optimizations, see our Optimization Notice.
Categories:
Tags: