Using the new concurrent priority queue in Intel® Threading Building Blocks

We have added a new example that illustrates the use of the concurrent priority queue in Intel® Threading Building Blocks (Intel® TBB). For details on the concurrent priority queue, see this earlier blog post.

For our example, we provide a simple parallelization of the shortest path problem from graph theory. This problem finds a minimally weighted path between two points. For example, consider locations on a road map. These can be represented by vertices in a graph. Some of the locations are directly connected by roads. These roads can be represented by edges in our graph. The distance along the road that directly connects two locations is an edge weight that can be represented by a function. A path from one location v1 to another location v2 is thus any sequence of edges that one can traverse to get from v1 to v2. The weight of that path is the sum of the weights on the edges that comprise the path. Thus, finding the shortest path in the road map scenario tells us the shortest way to get from one location to another (without off-roading).

A* is a serial heuristic search algorithm that can be used to solve the shortest path problem. A* uses a heuristic function f:V->R (where R is the domain of possible edge weights) to determine the order in which to visit the vertices of the graph. This function is the sum of two other functions: g:V->R and h:V->R. g is the cost of the path from the source vertex to the input vertex. h is an admissible heuristic estimate (i.e. it must not be an overestimate) of the weight of the path from the input vertex to the goal vertex. For example, in the road map scenario where we are concerned with distance, an admissible heuristic would be the direct distance between two vertices (as if they were directly connected by a perfectly straight road). The g and f functions are derived as the algorithm proceeds.

A priority queue called open_set is used to sort the vertices by f. To find the shortest path from a source vertex to a destination vertex, the source vertex is first placed in open_set and its g and f values are initialized (see the code below). Then in a loop, we remove a vertex u from the open_set to explore. Keep in mind that the open_set is a priority queue, so each time we remove a vertex, that vertex has the optimal f value encountered so far. If u is the destination vertex, then we are done (and we can reconstruct the path from the predecessor array we calculated). Otherwise, we place u in the closed set (to indicate that it has already been visited) and then examine each of its adjacent vertices. For a given vertex v, if v has already been explored (i.e. taken from the open_set and placed in the closed set) there is no need to revisit it. Otherwise, we calculate a new g value for v based on the fact that we are arriving at v from u. If the new g value is better than the old value, we update v’s predecessor to u, its g value to the new g value, and recalculate its f value given the new g value. If v was previously unexplored, we add it to open_set. If the algorithm exits the while loop without reaching the destination vertex, then there is no path from the source to the destination.

bool shortpath(source, dest)
open_set.push(source)
foreach v in V
g[v] := inf
f[v] := 0
closed[v] := false
end foreach
g[source] := 0.0
f[source] := h(source)
while (!open_set.empty())
u := open_set.pop()
if (u = dest) return true
closed[u] := true
foreach v in edges[u]
if (closed[v]) continue
new_g_v := g[u] + edge_weight(u, v)
if (new_g_v < g[v])
f[v] := new_g_v + h(v)
if (g[v] = inf) open_set.push(v)
predecessor[v] := u
g[v] := new_g_v
end if
end foreach
end while
return false
end shortpath

Note that for the optimal performance of this algorithm, the priority queue must allow for dynamic update of the priority values (here, f) of its contents, since the priority of an element may change after it has been placed in the queue.

To parallelize this algorithm, we make a few minor changes. First, we allow multiple threads to remove vertices from the open_set and explore them concurrently. Since our concurrent priority queue does not support the expensive priority update operation, we simply allow duplicate insertions of the same vertex, but with different priority values. When lower priority vertices are re-explored, we check if they have a less optimal f value than what is currently recorded, and if so, they are not explored. This approach removes the need for keeping track of what vertices have been visited via a closed set.

Below, we show an excerpt of our shortpath example, an implementation of the parallel version of this algorithm with Intel® TBB. As you can see, the parallel version makes use of a concurrent priority queue for open_set, since the calls to push and try_pop may happen concurrently from different threads. Reads and writes on the statuses of vertices are handled in critical sections controlled by a tbb::spin_mutex. This is a particularly good use of spin_mutex because the lock is restricted to a single vertex in the graph, so there are potentially many locks but each has light contention. Thus we expect to get the lock quickly, so spinning is far more efficient than putting the thread to sleep. The algorithm also makes use of tasking, via a task group, to trigger the exploration of other vertices on additional threads. A given task spawns just enough new tasks to handle the additions to open_set, limiting the number of spawned tasks to max_spawn, which is a function of the graph size. We limit the number of spawned tasks because it is more efficient for a task to keep looping and handling available work than it is to create new tasks, once the number of tasks has reached the available parallelism. However, since we do not know the available parallelism, we simply specify a grainsize in terms of number of vertices for each task to handle and divide the total number of vertices by that grainsize to get max_spawn. This grainsize can then be adjusted. Decreasing it creates more tasks so full use can be made of available parallelism; increasing it creates fewer tasks, but has less task creation overhead. If the available parallelism is known in advance, it can be adjusted to create exactly as many tasks as there are threads. The number of spawned tasks is maintained in a tbb::atomic variable num_spawn. This algorithm can also be used with a concurrent queue, since it can potentially be used to exhaustively examine all possibilities, but using a concurrent priority queue is more efficient because better paths are explored earlier, causing more suboptimal paths to be pruned.



struct point {
double x, y;
point() {}
point(double _x, double _y) : x(_x), y(_y) {}
point(const point& p) : x(p.x), y(p.y) {}
};
typedef vector < point > point_set;
typedef size_t vertex_id;
typedef std::pair < vertex_id, double > vertex_rec;
typedef vector < vector < vertex_id > > edge_set;

size_t N; // number of vertices
size_t src; // start of path
size_t dst; // end of path
const double INF=100000.0; // infinity, used to init f and g
size_t grainsize; // number of vertices per task on average
size_t max_spawn; // max tasks to spawn
atomic < size_t > num_spawn; // number of active tasks

point_set vertices; // vertices
edge_set edges; // edges
vector < vertex_id > predecessor; // for recreating path from src to dst

vector < double > f_distance; // estimated distances at particular vertex; init to INF
vector < double > g_distance; // current shortest distances from src vertex; init to INF
vector < spin_mutex > locks; // a lock for each vertex
task_group *sp_group; // task group for tasks executing sub-problems

class compare_f {
public:
bool operator()(const vertex_rec& u, const vertex_rec& v) const {
return u.second>v.second;
}
};

concurrent_priority_queue < vertex_rec, compare_f > open_set; // tentative vertices

void shortpath() {
g_distance[src] = 0.0;
f_distance[src] = get_distance(vertices[src], vertices[dst]);
open_set.push(make_pair(src,f_distance[src]));
sp_group->run([]{ shortpath_helper(); });
sp_group->wait();
}

void shortpath_helper() {
vertex_rec u_rec;
while (open_set.try_pop(u_rec)) {
vertex_id u = u_rec.first;
if (u==dst) continue;
double f = u_rec.second;
double old_g_u;
{
Spin_mutex::scoped_lock l(locks[u]);
if (f > f_distance[u]) continue; // prune search space
old_g_u = g_distance[u];
}
for (size_t i=0; i < edges[u].size(); ++i) {
vertex_id v = edges[u][i];
double new_g_v = old_g_u + get_distance(vertices[u], vertices[v]);
double new_f_v;
// “push” lets us move some work out of the critical section below
bool push = false;
{
spin_mutex::scoped_lock l(locks[v]);
if (new_g_v < g_distance[v]) {
predecessor[v] = u;
g_distance[v] = new_g_v;
new_f_v = f_distance[v] = g_distance[v] +
get_distance(vertices[v], vertices[dst]);
push = true;
}
}
if (push) {
open_set.push(make_pair(v,new_f_v));
size_t n_spawn = ++num_spawn;
if (n_spawn < max_spawn) {
sp_group->run([]{ shortpath_helper(); });
}
else –-num_spawn;
}
}
}
--num_spawn;
}

Contrassegni:
Per informazioni complete sulle ottimizzazioni del compilatore, consultare l'Avviso sull'ottimizzazione