Parallel while

Introduction

Example that uses parallel_while to do parallel preorder traversal of a sparse graph.
Each vertex in the graph is called a "cell". Each cell has a value. The value is a matrix. Some of the cells have operators that compute the cell's value, using other cell's values as input. A cell that uses the value of cell x is called a successor of x.

The algorithm works as follows.

• Compute the set of cells that have no inputs. This set is called root_set.
• Each cell has an associated field ref_count that is an atomic integer. Initialize ref_count to the number of inputs for the Cell.
• Update each cell in root_set, by applying a parallel_while to a stream that iterates over root_set
• After updating a cell, for each of its successors atomically decrement the successor's ref_count
• If the count became zero, add the cell to the set of cells to be updated, by calling parallel_while::add.
• The times printed are for the traversal and update, and do not include time for computing the root_set.

NOTE: It is important to understand that this example is unlikely to show speedup if the cell values are changed to type "float". The reason is twofold.

1. The smaller value type causes each Cell to be significantly smaller than a cache line, which leads to false sharing conflicts.
2. The time to update the cells becomes very small, and consequently the overhead of parallel_while swamps the useful work

Graph.h

==============================================================================

==============================================================================

 ```#include "Matrix.h" #include "tbb/atomic.h" #include "vector" namespace TBB = tbb; enum OpKind { // Use Cell's value OP_VALUE, // Unary negation OP_NEGATE, // Addition OP_ADD, // Subtraction OP_SUB, // Multiplication OP_MUL }; static const int ArityOfOp[] = {0,1,2,2,2}; class Cell { public: //! Operation for this cell OpKind op; //! Inputs to this cell Cell* input[2]; //! Type of value stored in a Cell typedef Matrix value_type; //! Value associated with this Cell value_type value; //! Set of cells that use this Cell as an input std::vector successor; //! Reference count of number of inputs that are not yet updated. TBB::atomic ref_count; //! Update the Cell's value. void update(); //! Default construtor Cell() {} }; //! A directed graph where the vertices are Cells. class Graph { std::vector my_vertex_set; public: //! Create a random acyclic directed graph void create_random_dag( size_t number_of_nodes ); //! Print the graph void print(); //! Get set of cells that have no inputs. void get_root_set( std::vector& root_set ); }; ```

Graph.cpp

==============================================================================

==============================================================================

 ```#include "cstdio" #include "Graph.h" void Graph::create_random_dag( size_t number_of_nodes ) { my_vertex_set.resize(number_of_nodes); for( size_t k=0; k>8)%5u); if( op>int(k) ) op = int(k); switch( op ) { default: c.op = OP_VALUE; c.value = Cell::value_type((float)k); break; case 1: c.op = OP_NEGATE; break; case 2: c.op = OP_SUB; break; case 3: c.op = D; break; case 4: c.op = OP_MUL; break; } for( int j=0; j Cell& input = my_vertex_set[rand()%k]; c.input[j] = &input; } } } void Graph::print() { for( size_t k=0; k printf("Cell %d:",int(k)); for( size_t j=0; j printf(" %d",int(my_vertex_set[k].successor[j] - &my_vertex_set[0])); printf("n"); } } void Graph::get_root_set( std::vector& root_set ) { for( size_t k=0; k my_vertex_set[k].successor.clear(); } root_set.clear(); for( size_t k=0; k Cell& c = my_vertex_set[k]; c.ref_count = ArityOfOp[c.op]; for( int j=0; j c.input[j]->successor.push_back(&c); } if( ArityOfOp[c.op]==0 ) root_set.push_back(&my_vertex_set[k]); } } void Cell::update() { switch( op ) { case OP_VALUE: break; case OP_NEGATE: value = -(input[0]->value); break; case OP_ADD: value = input[0]->value + input[1]->value; break; case OP_SUB: value = input[0]->value - input[1]->value; break; case OP_MUL: value = input[0]->value * input[1]->value; break; } } ```

Matrix.h

==============================================================================

==============================================================================

 ```class Matrix { static const int n = 10; float array[n][n]; public: Matrix() {} Matrix( float z ) { for( int i=0; i for( int j=0; j array[i][j] = i==j ? z : 0; } friend Matrix operator-( const Matrix& x ) { Matrix result; for( int i=0; i for( int j=0; j result.array[i][j] = -x.array[i][j]; return result; } friend Matrix operator+( const Matrix& x, const Matrix& y ) { Matrix result; for( int i=0; i for( int j=0; j result.array[i][j] = x.array[i][j] + y.array[i][j]; return result; } friend Matrix operator-( const Matrix& x, const Matrix& y ) { Matrix result; for( int i=0; i for( int j=0; j result.array[i][j] = x.array[i][j] - y.array[i][j]; return result; } friend Matrix operator*( const Matrix& x, const Matrix& y ) { Matrix result(0); for( int i=0; i for( int k=0; k for( int j=0; j result.array[i][j] += x.array[i][k] * y.array[k][j]; return result; } }; ```

parallel_preorder.cpp

==============================================================================

==============================================================================

Example program that shows how to use parallel_while to do parallel preorder

 ```traversal of a directed acyclic graph. */ #include "tbb/parallel_while.h" #include "tbb/atomic.h" #include "vector" #include "algorithm" #include "Graph.h" class Body { tbb::parallel_while& my_while; public: Body( tbb::parallel_while& w ) : my_while(w) {}; //------------------------------------------------------------------------ // Following signatures required by parallel_while //------------------------------------------------------- ----------------- typedef Cell* argument_type; void operator()( Cell* c ) const { c->update(); // Restore ref_count in preparation for subsequent traversal. c->ref_count = ArityOfOp[c->op]; for( size_t k=0; ksuccessor.size(); ++k ) { Cell* successor = c->successor[k]; if( 0 == --(successor->ref_count) ) { my_while.add( successor ); } } } }; class Stream { size_t k; const std::vector& my_roots; public: Stream( const std::vector& root_set ) : my_roots(root_set), k(0) {} bool pop_if_present( Cell*& item ) { bool result = kif( result ) item = my_roots[k++]; return result; } }; void ParallelPreorderTraversal( const std::vector& root_set ) { tbb::parallel_while w; Stream s(root_set); w.run(s,Body(w)); } //------------------------------------------------------------------------ // Test driver //------------------------------------------------------------------------ #include "cctype" #include "tbb/task_scheduler_init.h" #include "tbb/tick_count.h" //! A closed range of int. struct IntRange { int low; int high; void set_from_string( const char* s ); IntRange( int low_, int high_ ) : low(low_), high(high_) {} }; void IntRange::set_from_string( const char* s ) { char* end; high = low = strtol(s,&end,0); switch( *end ) { case ':': high = strtol(end+1,0,0); break; case '0': break; default: printf("unexpected character = %cn",*end); } } //! Number of threads to use. static IntRange NThread(1,4); //! If true, then at end wait for user to hit return static bool PauseFlag = false; //! Parse the command line. static void ParseCommandLine( int argc, char* argv[] ) { int i = 1; if( argc==1 || i PauseFlag = true; ++i; } if( i // Command line is garbled. fprintf(stderr,"Usage: %s ['pause'] [nthread]n", argv[0]); fprintf(stderr,"where nthread is a non-negative integer, or range of the form low:high [%d:%d]n",NThread.low,NThread.high); exit(1); } if( i NThread.set_from_string(argv[i++]); } int main( int argc, char* argv[] ) { ParseCommandLine(argc,argv); // Start Intel® Threading Building Blocks scheduler with given number of threads. for( int p=NThread.low; p<=NThread.high; ++p ) { tbb::task_scheduler_init init(p); srand(2); tbb::tick_count::interval_t interval; size_t total_root_set_size = 0; const int ntrial = 50; for( int trial=0; trial Graph g; g.create_random_dag(1000); std::vector root_set; g.get_root_set(root_set); total_root_set_size += root_set.size(); tbb::tick_count t0 = tbb::tick_count::now(); for( int i=0; i<10; ++i ) { ParallelPreorderTraversal(root_set); } tbb::tick_count t1 = tbb::tick_count::now(); interval += t1-t0; } printf("%g seconds using %d threads (average of %g nodes in root_set)n",interval.seconds(),p,(double)total_root_set_size/ntrial); } return 0; } ```

For more complete information about compiler optimizations, see our Optimization Notice.

Top