Pipeline

Submit New Article

March 11, 2009 1:00 AM PDT



Text filter that demonstrates class pipeline

text_filter.cpp
Copyright 2005-2006 Intel Corporation. All Rights Reserved.

 

Example program that reads a file of text and changes the first letter of each word to upper case.

#include "tbb/pipeline.h" 
#include "tbb/tick_count.h"
#include "tbb/task_scheduler_init.h"
#include "cstring"
#include "cstdlib"
#include "cstdio"
#include "cctype"
using namespace std;
//! Buffer that holds block of characters and last character of previous buffer.
class MyBuffer {
static const size_t buffer_size = 10000;
char* my_end;
//! storage[0] holds the last character of the previous buffer.
char storage[1+buffer_size];
public:
//! Pointer to first character in the buffer
char* begin() {return storage+1;}
const char* begin() const {return storage+1;}
//! Pointer to one past last character in the buffer
char* end() const {return my_end;}
//! Set end of buffer.
void set_end( char* new_ptr ) {my_end=new_ptr;}
//! Number of bytes a buffer can hold
size_t max_size() const {return buffer_size;}
//! Number of bytes appended to buffer.
size_t size() const {return my_end-begin();}
};
class MyInputFilter: public tbb::filter {
public:
static const size_t n_buffer = 8;
MyInputFilter( FILE* input_file_ );
private:
FILE* input_file;
size_t next_buffer;
char last_char_of_previous_buffer;
MyBuffer buffer[n_buffer];
/*override*/ void* operator()(void*);
};
MyInputFilter::MyInputFilter( FILE* input_file_ ) :
filter(/*is_serial=*/true),
next_buffer(0),
input_file(input_file_),
last_char_of_previous_buffer(' ')
{
}
void* MyInputFilter::operator()(void*) {
MyBuffer& b = buffer[next_buffer];
next_buffer = (next_buffer+1) % n_buffer;
size_t n = fread( b.begin(), 1, b.max_size(), input_file );
if( !n ) {
// end of file
return NULL;
} else {
b.begin()[-1] = last_char_of_previous_buffer;
last_char_of_previous_buffer = b.begin()[n-1];
b.set_end( b.begin()+n );
return &b;
}
}
//! Filter that changes the first letter of each word from lower case to upper case.
class MyTransformFilter: public tbb::filter {
public:
MyTransformFilter();
/*override*/void* operator()( void* item );
};
MyTransformFilter::MyTransformFilter() :
tbb::filter(/*ordered=*/false)
{}
/*override*/void* MyTransformFilter::operator()( void* item ) {
MyBuffer& b = *static_cast(item);
int prev_char_is_space = b.begin()[-1]==' ';
for( char* s=b.begin(); s!=b.end(); ++s ) {
if( prev_char_is_space && islower(*s) )
*s = toupper(*s);
prev_char_is_space = isspace((unsigned char)*s);
}
return &b;
}

//! Filter that writes each buffer to a file.
class MyOutputFilter: public tbb::filter {
FILE* my_output_file;
public:
MyOutputFilter( FILE* output_file );
/*override*/void* operator()( void* item );
};
MyOutputFilter::MyOutputFilter( FILE* output_file ) :
tbb::filter(/*is_serial=*/true),
my_output_file(output_file)
{
}
void* MyOutputFilter::operator()( void* item ) {
MyBuffer& b = *static_cast(item);
fwrite( b.begin(), 1, b.size(), my_output_file );
return NULL;
}
static int NThread = tbb::task_scheduler_init::automatic; < > static const char* InputFileName = "input.txt";
static const char* OutputFileName = "output.txt";
void Usage()
{
fprintf( stderr, "Usage:ttext_filter input-file [output-file [nthread]]n");
exit(1);
}
void ParseCommandLine( int argc, char* argv[] ) {
// Parse command line
if( argc> 4 ) Usage();
if( argc>=2 ) InputFileName = argv[1];
if( argc>=3 ) OutputFileName = argv[2];
if( argc>=4 ) {
NThread = strtol(argv[3],0,0);
if( NThread<1 ) {
fprintf(stderr,"nthread set to %d, but must be at least 1n",NThread);
exit(1);
}
}
}
int main( int argc, char* argv[] ) {
ParseCommandLine( argc, argv );
// Start task scheduler
tbb::task_scheduler_init init(NThread);
FILE* input_file = fopen(InputFileName,"r");
if( !input_file ) {
perror( InputFileName );
Usage();
}
FILE* output_file = fopen(OutputFileName,"w");
if( !output_file ) {
perror( OutputFileName );
exit(1);
}
// Create the pipeline
tbb::pipeline pipeline;
// Create file-reading writing stage and add it to the pipeline
MyInputFilter input_filter( input_file );
pipeline.add_filter( input_filter );
// Create capitalization stage and add it to the pipeline
MyTransformFilter transform_filter;
pipeline.add_filter( transform_filter );
// Create file-writing stage and add it to the pipeline
MyOutputFilter output_filter( output_file );
pipeline.add_filter( output_filter );
// Run the pipeline
tbb::tick_count t0 = tbb::tick_count::now();
pipeline.run( MyInputFilter::n_buffer );
tbb::tick_count t1 = tbb::tick_count::now();
// Remove filters from pipeline before they are implicitly destroyed.
pipeline.clear();
fclose( output_file );
fclose( input_file );
printf("time = %gn",(t1-t0).seconds());
}