Introduction
Example that uses parallel_while to do parallel preorder traversal of a sparse graph.
Each vertex in the graph is called a "cell". Each cell has a value. The value is a matrix. Some of the cells have operators that compute the cell's value, using other cell's values as input. A cell that uses the value of cell x is called a successor of x.
The algorithm works as follows.
- Compute the set of cells that have no inputs. This set is called root_set.
- Each cell has an associated field ref_count that is an atomic integer. Initialize ref_count to the number of inputs for the Cell.
- Update each cell in root_set, by applying a parallel_while to a stream that iterates over root_set
- After updating a cell, for each of its successors atomically decrement the successor's ref_count
- If the count became zero, add the cell to the set of cells to be updated, by calling parallel_while::add.
- The times printed are for the traversal and update, and do not include time for computing the root_set.
NOTE: It is important to understand that this example is unlikely to show speedup if the cell values are changed to type "float". The reason is twofold.
- The smaller value type causes each Cell to be significantly smaller than a cache line, which leads to false sharing conflicts.
- The time to update the cells becomes very small, and consequently the overhead of parallel_while swamps the useful work
Graph.h
==============================================================================
Copyright 2005-2006 Intel Corporation. All Rights Reserved.
==============================================================================
#include "Matrix.h"
#include "tbb/atomic.h"
#include "vector"
namespace TBB = tbb;
enum OpKind {
// Use Cell's value
OP_VALUE,
// Unary negation
OP_NEGATE,
// Addition
OP_ADD,
// Subtraction
OP_SUB,
// Multiplication
OP_MUL
};
static const int ArityOfOp[] = {0,1,2,2,2};
class Cell {
public:
//! Operation for this cell
OpKind op;
//! Inputs to this cell
Cell* input[2];
//! Type of value stored in a Cell
typedef Matrix value_type;
//! Value associated with this Cell
value_type value;
//! Set of cells that use this Cell as an input
std::vector successor;
//! Reference count of number of inputs that are not yet updated.
TBB::atomic ref_count;
//! Update the Cell's value.
void update();
//! Default construtor
Cell() {}
};
//! A directed graph where the vertices are Cells.
class Graph {
std::vector my_vertex_set;
public:
//! Create a random acyclic directed graph
void create_random_dag( size_t number_of_nodes );
//! Print the graph
void print();
//! Get set of cells that have no inputs.
void get_root_set( std::vector& root_set );
};
|
Graph.cpp
==============================================================================
Copyright 2005-2006 Intel Corporation. All Rights Reserved.
==============================================================================
#include "cstdio"
#include "Graph.h"
void Graph::create_random_dag( size_t number_of_nodes ) {
my_vertex_set.resize(number_of_nodes);
for( size_t k=0; k<number_of_nodes; ++k ) {
Cell& c = my_vertex_set[k];
int op = int((rand()>>8)%5u); if( op>int(k) ) op = int(k);
switch( op ) {
default:
c.op = OP_VALUE;
c.value = Cell::value_type((float)k);
break;
case 1:
c.op = OP_NEGATE;
break;
case 2:
c.op = OP_SUB;
break;
case 3:
c.op = D;
break;
case 4:
c.op = OP_MUL;
break;
}
for( int j=0; j
Cell& input = my_vertex_set[rand()%k];
c.input[j] = &input;
}
}
}
void Graph::print() {
for( size_t k=0; k
printf("Cell %d:",int(k));
for( size_t j=0; j
printf(" %d",int(my_vertex_set[k].successor[j] - &my_vertex_set[0]));
printf("n");
}
}
void Graph::get_root_set( std::vector& root_set ) {
for( size_t k=0; k
my_vertex_set[k].successor.clear();
}
root_set.clear();
for( size_t k=0; k
Cell& c = my_vertex_set[k];
c.ref_count = ArityOfOp[c.op];
for( int j=0; j
c.input[j]->successor.push_back(&c);
}
if( ArityOfOp[c.op]==0 )
root_set.push_back(&my_vertex_set[k]);
}
}
void Cell::update() {
switch( op ) {
case OP_VALUE:
break;
case OP_NEGATE:
value = -(input[0]->value);
break;
case OP_ADD:
value = input[0]->value + input[1]->value;
break;
case OP_SUB:
value = input[0]->value - input[1]->value;
break;
case OP_MUL:
value = input[0]->value * input[1]->value;
break;
}
}
|
Matrix.h
==============================================================================
Copyright 2005-2006 Intel Corporation. All Rights Reserved.
==============================================================================
class Matrix {
static const int n = 10;
float array[n][n];
public:
Matrix() {}
Matrix( float z ) {
for( int i=0; i
for( int j=0; j
array[i][j] = i==j ? z : 0;
}
friend Matrix operator-( const Matrix& x ) {
Matrix result;
for( int i=0; i
for( int j=0; j
result.array[i][j] = -x.array[i][j];
return result;
}
friend Matrix operator+( const Matrix& x, const Matrix& y ) {
Matrix result;
for( int i=0; i
for( int j=0; j
result.array[i][j] = x.array[i][j] + y.array[i][j];
return result;
}
friend Matrix operator-( const Matrix& x, const Matrix& y ) {
Matrix result;
for( int i=0; i
for( int j=0; j
result.array[i][j] = x.array[i][j] - y.array[i][j];
return result;
}
friend Matrix operator*( const Matrix& x, const Matrix& y ) {
Matrix result(0);
for( int i=0; i
for( int k=0; k
for( int j=0; j
result.array[i][j] += x.array[i][k] * y.array[k][j];
return result;
}
};
|
parallel_preorder.cpp
==============================================================================
Copyright 2005-2006 Intel Corporation. All Rights Reserved.
==============================================================================
Example program that shows how to use parallel_while to do parallel preorder
traversal of a directed acyclic graph. */
#include "tbb/parallel_while.h"
#include "tbb/atomic.h"
#include "vector"
#include "algorithm"
#include "Graph.h"
class Body {
tbb::parallel_while& my_while;
public:
Body( tbb::parallel_while& w ) : my_while(w) {};
//------------------------------------------------------------------------
// Following signatures required by parallel_while
//------------------------------------------------------- -----------------
typedef Cell* argument_type;
void operator()( Cell* c ) const {
c->update();
// Restore ref_count in preparation for subsequent traversal.
c->ref_count = ArityOfOp[c->op];
for( size_t k=0; ksuccessor.size(); ++k ) {
Cell* successor = c->successor[k];
if( 0 == --(successor->ref_count) ) {
my_while.add( successor );
}
}
}
};
class Stream {
size_t k;
const std::vector& my_roots;
public:
Stream( const std::vector& root_set ) : my_roots(root_set), k(0) {}
bool pop_if_present( Cell*& item ) {
bool result = kif( result ) item = my_roots[k++];
return result;
}
};
void ParallelPreorderTraversal( const std::vector& root_set ) {
tbb::parallel_while w;
Stream s(root_set);
w.run(s,Body(w));
}
//------------------------------------------------------------------------
// Test driver
//------------------------------------------------------------------------
#include "cctype"
#include "tbb/task_scheduler_init.h"
#include "tbb/tick_count.h"
//! A closed range of int.
struct IntRange {
int low;
int high;
void set_from_string( const char* s );
IntRange( int low_, int high_ ) : low(low_), high(high_) {}
};
void IntRange::set_from_string( const char* s ) {
char* end;
high = low = strtol(s,&end,0);
switch( *end ) {
case ':':
high = strtol(end+1,0,0);
break;
case '0':
break;
default:
printf("unexpected character = %cn",*end);
}
}
//! Number of threads to use.
static IntRange NThread(1,4);
//! If true, then at end wait for user to hit return
static bool PauseFlag = false;
//! Parse the command line.
static void ParseCommandLine( int argc, char* argv[] ) {
int i = 1;
if( argc==1 || i
PauseFlag = true;
++i;
}
if( i
// Command line is garbled.
fprintf(stderr,"Usage: %s ['pause'] [nthread]n", argv[0]);
fprintf(stderr,"where nthread is a non-negative integer, or range of the form low:high [%d:%d]n",NThread.low,NThread.high);
exit(1);
}
if( i
NThread.set_from_string(argv[i++]);
}
int main( int argc, char* argv[] ) {
ParseCommandLine(argc,argv);
// Start Intel(R) Threading Building Blocks scheduler with given number of threads.
for( int p=NThread.low; p<=NThread.high; ++p ) {
tbb::task_scheduler_init init(p);
srand(2);
tbb::tick_count::interval_t interval;
size_t total_root_set_size = 0;
const int ntrial = 50;
for( int trial=0; trial
Graph g;
g.create_random_dag(1000);
std::vector root_set;
g.get_root_set(root_set);
total_root_set_size += root_set.size();
tbb::tick_count t0 = tbb::tick_count::now();
for( int i=0; i<10; ++i ) {
ParallelPreorderTraversal(root_set);
}
tbb::tick_count t1 = tbb::tick_count::now();
interval += t1-t0;
}
printf("%g seconds using %d threads (average of %g nodes in root_set)n",interval.seconds(),p,(double)total_root_set_size/ntrial);
}
return 0;
}
|
Todd Bezenek
565
-Todd