Parallel while


Introduction

Example that uses parallel_while to do parallel preorder traversal of a sparse graph.
Each vertex in the graph is called a "cell". Each cell has a value. The value is a matrix. Some of the cells have operators that compute the cell's value, using other cell's values as input. A cell that uses the value of cell x is called a successor of x.

The algorithm works as follows.

 

  • Compute the set of cells that have no inputs. This set is called root_set.
  • Each cell has an associated field ref_count that is an atomic integer. Initialize ref_count to the number of inputs for the Cell.
  • Update each cell in root_set, by applying a parallel_while to a stream that iterates over root_set
  • After updating a cell, for each of its successors atomically decrement the successor's ref_count
  • If the count became zero, add the cell to the set of cells to be updated, by calling parallel_while::add.
  • The times printed are for the traversal and update, and do not include time for computing the root_set.

 

NOTE: It is important to understand that this example is unlikely to show speedup if the cell values are changed to type "float". The reason is twofold.

  1. The smaller value type causes each Cell to be significantly smaller than a cache line, which leads to false sharing conflicts.
  2. The time to update the cells becomes very small, and consequently the overhead of parallel_while swamps the useful work

 


Graph.h

==============================================================================

Copyright 2005-2006 Intel Corporation. All Rights Reserved.

==============================================================================

#include "Matrix.h" 

#include "tbb/atomic.h" 

#include "vector" 

namespace TBB = tbb; 

enum OpKind { 

// Use Cell's value 

OP_VALUE, 

// Unary negation 

OP_NEGATE, 

// Addition 

OP_ADD, 

// Subtraction 

OP_SUB, 

// Multiplication 

OP_MUL 

}; 

static const int ArityOfOp[] = {0,1,2,2,2}; 

class Cell { 

public: 

//! Operation for this cell 

OpKind op; 

//! Inputs to this cell 

Cell* input[2]; 


//! Type of value stored in a Cell 

typedef Matrix value_type; 

//! Value associated with this Cell 

value_type value; 

//! Set of cells that use this Cell as an input 

std::vector successor; 

//! Reference count of number of inputs that are not yet updated. 

TBB::atomic ref_count; 

//! Update the Cell's value. 

void update(); 

//! Default construtor 

Cell() {} 

}; 

//! A directed graph where the vertices are Cells. 

class Graph { 

std::vector my_vertex_set; 

public: 

//! Create a random acyclic directed graph 

void create_random_dag( size_t number_of_nodes ); 

//! Print the graph 

void print(); 

//! Get set of cells that have no inputs. 

void get_root_set( std::vector& root_set ); 

}; 

 


Graph.cpp

==============================================================================

Copyright 2005-2006 Intel Corporation. All Rights Reserved.

==============================================================================

#include "cstdio" 

#include "Graph.h" 

void Graph::create_random_dag( size_t number_of_nodes ) { 

my_vertex_set.resize(number_of_nodes); 

for( size_t k=0; k<number_of_nodes; ++k ) { 

Cell& c = my_vertex_set[k]; 

int op = int((rand()>>8)%5u); 
if( op>int(k) ) op = int(k); 

switch( op ) { 

default: 

c.op = OP_VALUE; 

c.value = Cell::value_type((float)k); 

break; 

case 1: 

c.op = OP_NEGATE; 

break; 

case 2: 

c.op = OP_SUB; 

break; 

case 3: 

c.op = D; 

break; 

case 4: 

c.op = OP_MUL; 

break; 

} 

for( int j=0; j
Cell& input = my_vertex_set[rand()%k]; 

c.input[j] = &input; 

} 

} 

} 

void Graph::print() { 

for( size_t k=0; k
printf("Cell %d:",int(k)); 

for( size_t j=0; j
printf(" %d",int(my_vertex_set[k].successor[j] - &my_vertex_set[0])); 

printf("n"); 

} 

} 

void Graph::get_root_set( std::vector& root_set ) { 

for( size_t k=0; k
my_vertex_set[k].successor.clear(); 

} 

root_set.clear(); 

for( size_t k=0; k
Cell& c = my_vertex_set[k]; 

c.ref_count = ArityOfOp[c.op]; 

for( int j=0; j
c.input[j]->successor.push_back(&c); 

} 

if( ArityOfOp[c.op]==0 ) 

root_set.push_back(&my_vertex_set[k]); 

} 

} 

void Cell::update() { 

switch( op ) { 

case OP_VALUE: 

break; 

case OP_NEGATE: 

value = -(input[0]->value); 

break; 

case OP_ADD: 

value = input[0]->value + input[1]->value; 

break; 

case OP_SUB: 

value = input[0]->value - input[1]->value; 

break; 

case OP_MUL: 

value = input[0]->value * input[1]->value; 

break; 

} 

} 

 


Matrix.h

==============================================================================

Copyright 2005-2006 Intel Corporation. All Rights Reserved.

==============================================================================

class Matrix { 

static const int n = 10; 

float array[n][n]; 

public: 

Matrix() {} 

Matrix( float z ) { 

for( int i=0; i
for( int j=0; j
array[i][j] = i==j ? z : 0; 

} 

friend Matrix operator-( const Matrix& x ) { 

Matrix result; 

for( int i=0; i
for( int j=0; j
result.array[i][j] = -x.array[i][j]; 

return result; 

} 

friend Matrix operator+( const Matrix& x, const Matrix& y ) { 

Matrix result; 

for( int i=0; i
for( int j=0; j
result.array[i][j] = x.array[i][j] + y.array[i][j]; 

return result; 

} 

friend Matrix operator-( const Matrix& x, const Matrix& y ) { 

Matrix result; 

for( int i=0; i
for( int j=0; j
result.array[i][j] = x.array[i][j] - y.array[i][j]; 

return result; 

} 

friend Matrix operator*( const Matrix& x, const Matrix& y ) { 

Matrix result(0); 

for( int i=0; i
for( int k=0; k
for( int j=0; j
result.array[i][j] += x.array[i][k] * y.array[k][j]; 

return result; 

} 

}; 

 


parallel_preorder.cpp

==============================================================================

Copyright 2005-2006 Intel Corporation. All Rights Reserved.

==============================================================================

Example program that shows how to use parallel_while to do parallel preorder

traversal of a directed acyclic graph. */ 

#include "tbb/parallel_while.h" 

#include "tbb/atomic.h" 

#include "vector" 

#include "algorithm" 

#include "Graph.h" 

class Body { 

tbb::parallel_while& my_while; 

public: 

Body( tbb::parallel_while& w ) : my_while(w) {}; 

//------------------------------------------------------------------------ 

// Following signatures required by parallel_while 

//------------------------------------------------------- ----------------- 

typedef Cell* argument_type; 

void operator()( Cell* c ) const { 

c->update(); 

// Restore ref_count in preparation for subsequent traversal. 

c->ref_count = ArityOfOp[c->op]; 

for( size_t k=0; ksuccessor.size(); ++k ) { 

Cell* successor = c->successor[k]; 

if( 0 == --(successor->ref_count) ) { 

my_while.add( successor ); 

} 

} 

} 

}; 

class Stream { 

size_t k; 

const std::vector& my_roots; 

public: 

Stream( const std::vector& root_set ) : my_roots(root_set), k(0) {} 

bool pop_if_present( Cell*& item ) { 

bool result = kif( result ) 
item = my_roots[k++]; 

return result; 

} 

}; 

void ParallelPreorderTraversal( const std::vector& root_set ) { 

tbb::parallel_while w; 

Stream s(root_set); 

w.run(s,Body(w)); 

} 

//------------------------------------------------------------------------ 

// Test driver 

//------------------------------------------------------------------------ 

#include "cctype" 

#include "tbb/task_scheduler_init.h" 

#include "tbb/tick_count.h" 

//! A closed range of int. 

struct IntRange { 

int low; 

int high; 

void set_from_string( const char* s ); 

IntRange( int low_, int high_ ) : low(low_), high(high_) {} 

}; 

void IntRange::set_from_string( const char* s ) { 

char* end; 

high = low = strtol(s,&end,0); 

switch( *end ) { 

case ':': 

high = strtol(end+1,0,0); 

break; 

case '0': 

break; 

default: 

printf("unexpected character = %cn",*end); 

} 

} 

//! Number of threads to use. 

static IntRange NThread(1,4); 

//! If true, then at end wait for user to hit return 

static bool PauseFlag = false; 

//! Parse the command line. 

static void ParseCommandLine( int argc, char* argv[] ) { 

int i = 1; 

if( argc==1 || i
PauseFlag = true; 

++i; 

} 

if( i
// Command line is garbled. 

fprintf(stderr,"Usage: %s ['pause'] [nthread]n", argv[0]); 

fprintf(stderr,"where nthread is a non-negative integer, or range of the form low:high [%d:%d]n",NThread.low,NThread.high); 

exit(1); 

} 

if( i
NThread.set_from_string(argv[i++]); 

} 

int main( int argc, char* argv[] ) { 

ParseCommandLine(argc,argv); 

// Start Intel® Threading Building Blocks scheduler with given number of threads. 

for( int p=NThread.low; p<=NThread.high; ++p ) { 

tbb::task_scheduler_init init(p); 

srand(2); 

tbb::tick_count::interval_t interval; 

size_t total_root_set_size = 0; 

const int ntrial = 50; 

for( int trial=0; trial
Graph g; 

g.create_random_dag(1000); 

std::vector root_set; 

g.get_root_set(root_set); 

total_root_set_size += root_set.size(); 

tbb::tick_count t0 = tbb::tick_count::now(); 

for( int i=0; i<10; ++i ) { 

ParallelPreorderTraversal(root_set); 

} 

tbb::tick_count t1 = tbb::tick_count::now(); 

interval += t1-t0; 

} 

printf("%g seconds using %d threads (average of %g nodes in root_set)n",interval.seconds(),p,(double)total_root_set_size/ntrial); 

} 

return 0; 

} 

 

For more complete information about compiler optimizations, see our Optimization Notice.