Pipeline


Text filter that demonstrates class pipeline


text_filter.cpp
Copyright 2005-2006 Intel Corporation. All Rights Reserved.

 

Example program that reads a file of text and changes the first letter of each word to upper case.

#include "tbb/pipeline.h" 

#include "tbb/tick_count.h" 

#include "tbb/task_scheduler_init.h" 

#include "cstring" 

#include "cstdlib" 

#include "cstdio" 

#include "cctype"

using namespace std; 

//! Buffer that holds block of characters and last character of previous buffer. 

class MyBuffer { 

static const size_t buffer_size = 10000; 

char* my_end; 

//! storage[0] holds the last character of the previous buffer. 

char storage[1+buffer_size]; 

public: 

//! Pointer to first character in the buffer 

char* begin() {return storage+1;} 

const char* begin() const {return storage+1;} 
//! Pointer to one past last character in the buffer 
char* end() const {return my_end;} 
//! Set end of buffer. 
void set_end( char* new_ptr ) {my_end=new_ptr;} 
//! Number of bytes a buffer can hold 
size_t max_size() const {return buffer_size;} 
//! Number of bytes appended to buffer. 
size_t size() const {return my_end-begin();} 
}; 
class MyInputFilter: public tbb::filter { 
public: 
static const size_t n_buffer = 8; 
MyInputFilter( FILE* input_file_ ); 
private: 
FILE* input_file; 
size_t next_buffer; 
char last_char_of_previous_buffer; 
MyBuffer buffer[n_buffer]; 
/*override*/ void* operator()(void*); 
}; 
MyInputFilter::MyInputFilter( FILE* input_file_ ) : 
filter(/*is_serial=*/true), 
next_buffer(0), 
input_file(input_file_), 
last_char_of_previous_buffer(' ') 
{ 
} 
void* MyInputFilter::operator()(void*) { 
MyBuffer& b = buffer[next_buffer]; 
next_buffer = (next_buffer+1) % n_buffer; 
size_t n = fread( b.begin(), 1, b.max_size(), input_file ); 
if( !n ) { 
// end of file 
return NULL; 
} else { 
b.begin()[-1] = last_char_of_previous_buffer; 
last_char_of_previous_buffer = b.begin()[n-1]; 
b.set_end( b.begin()+n ); 
return &b; 
} 
} 
//! Filter that changes the first letter of each word from lower case to upper case. 
class MyTransformFilter: public tbb::filter { 
public: 
MyTransformFilter(); 
/*override*/void* operator()( void* item ); 
}; 
MyTransformFilter::MyTransformFilter() : 
tbb::filter(/*ordered=*/false) 
{} 
/*override*/void* MyTransformFilter::operator()( void* item ) { 
MyBuffer& b = *static_cast(item); 
int prev_char_is_space = b.begin()[-1]==' '; 
for( char* s=b.begin(); s!=b.end(); ++s ) { 
if( prev_char_is_space && islower(*s) ) 
*s = toupper(*s); 
prev_char_is_space = isspace((unsigned char)*s); 
} 
return &b; 
} 

//! Filter that writes each buffer to a file. 
class MyOutputFilter: public tbb::filter { 
FILE* my_output_file; 
public: 
MyOutputFilter( FILE* output_file ); 
/*override*/void* operator()( void* item ); 
}; 
MyOutputFilter::MyOutputFilter( FILE* output_file ) : 
tbb::filter(/*is_serial=*/true), 
my_output_file(output_file) 
{ 
} 
void* MyOutputFilter::operator()( void* item ) { 
MyBuffer& b = *static_cast(item); 
fwrite( b.begin(), 1, b.size(), my_output_file ); 
return NULL; 
} 
static int NThread = tbb::task_scheduler_init::automatic; < > static const char* InputFileName = "input.txt"; 
static const char* OutputFileName = "output.txt"; 
void Usage() 
{ 
fprintf( stderr, "Usage:ttext_filter input-file [output-file [nthread]]n"); 
exit(1); 
} 
void ParseCommandLine( int argc, char* argv[] ) { 
// Parse command line 
if( argc> 4 ) Usage(); 
if( argc>=2 ) InputFileName = argv[1]; 
if( argc>=3 ) OutputFileName = argv[2]; 
if( argc>=4 ) { 
NThread = strtol(argv[3],0,0); 
if( NThread<1 ) { 
fprintf(stderr,"nthread set to %d, but must be at least 1n",NThread); 
exit(1); 
} 
} 
} 
int main( int argc, char* argv[] ) { 
ParseCommandLine( argc, argv ); 
// Start task scheduler 
tbb::task_scheduler_init init(NThread); 
FILE* input_file = fopen(InputFileName,"r"); 
if( !input_file ) { 
perror( InputFileName ); 
Usage(); 
} 
FILE* output_file = fopen(OutputFileName,"w"); 
if( !output_file ) { 
perror( OutputFileName ); 
exit(1); 
} 
// Create the pipeline 
tbb::pipeline pipeline; 
// Create file-reading writing stage and add it to the pipeline 
MyInputFilter input_filter( input_file ); 
pipeline.add_filter( input_filter ); 
// Create capitalization stage and add it to the pipeline 
MyTransformFilter transform_filter; 
pipeline.add_filter( transform_filter ); 
// Create file-writing stage and add it to the pipeline 
MyOutputFilter output_filter( output_file ); 
pipeline.add_filter( output_filter ); 
// Run the pipeline 
tbb::tick_count t0 = tbb::tick_count::now(); 
pipeline.run( MyInputFilter::n_buffer ); 
tbb::tick_count t1 = tbb::tick_count::now(); 
// Remove filters from pipeline before they are implicitly destroyed. 
pipeline.clear(); 
fclose( output_file ); 
fclose( input_file ); 
printf("time = %gn",(t1-t0).seconds()); 
} 

 

For more complete information about compiler optimizations, see our Optimization Notice.

1 comment

Top

Add a Comment

Have a technical question? Visit our forums. Have site or software product issues? Contact support.