Forum Jump

Select Group :
Select Forum :
Sorted By :
Sort Order :
From The :
 
Thread Tools  Search this thread 
Jogi Narain
Total Points:
90
Status Points:
40
Green Belt
November 9, 2009 6:54 AM PST
Pipe Line Problem
Hi All,
I have created a simple 3 stage pipe-line. The first stage creates some data, the second processes it and the third prints it out.

I am having a problem with the first stage. Instead of generating 10 messages then quitting it continues for ever (ok -- until I ctrl-c after 10 secs)

If I subsitute the "for" loop with a "while" loop reading data from a file it works fine!

Has anyone got any insight into why the "while" works and the "for" fails? Thanks

Regards
->Jogi

###########

void* operator()(void* item)
{
int i = 0 ;
while (!_ifs.eof()) {
i++ ;
_ifs >> _datap.name ;
_ifs >> _datap.val_a ;
_ifs >> _datap.val_b ;
_ifs >> _datap.val_c ;

if (_datap.name == "") break ;

std::cout << "Rm: " << _counter << " " << _datap.name << " " << _datap.val_a << " " << _datap.val_b << " " << _datap.val_c << std::endl ;

return &_datap ;
}

return NULL ;
}

##########
#include <string>
#include <sstream>
#include <vector>
#include <iostream>

#include "tbb/task_scheduler_init.h"
#include "tbb/concurrent_hash_map.h"
#include "tbb/pipeline.h"

typedef struct
{
std::string name ;
int val_a ;
double val_b ;
int val_c ;
} MsgData;

typedef tbb::concurrent_hash_map<std::string, MsgData> MsgMap ;

class ReadMessage : public tbb::filter
{
public:
ReadMessage(int count = 10) : tbb::filter(serial_in_order), _count(count) {}

private:

void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;

//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;

return &_data_msg ;
}

return NULL ;
}

int _count ;
MsgData _data_msg ;
};

class ProcessMessage : public tbb::filter
{
public:
ProcessMessage(MsgMap& name_map) : tbb::filter(parallel)
{
}

private:

void* operator()(void* item)
{
MsgData* datap = static_cast<MsgData*>(item);

if (datap != NULL) {
//std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
MsgMap::accessor a ;
if (_name_map.find(a, datap->name)) {
a->second = *datap ;
}
else {
_name_map.insert(a, datap->name) ;
a->second = *datap ;
}
}

return datap ;
}

MsgMap _name_map ;
};


class PublishMessage : public tbb::filter
{
public:
PublishMessage() : tbb::filter(serial_in_order)
{
}

private:

void* operator()(void* item)
{
MsgData* datap = static_cast<MsgData*>(item);

if (datap != NULL) {
std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
}

return NULL ;
}

};

int main(int argc, char* argv[])
{
MsgMap names ;
tbb::task_scheduler_init init ;

// Create the pipeline
tbb::pipeline pipeline;

// Create file-reading writing stage and add it to the pipeline
ReadMessage input_filter(10);
pipeline.add_filter(input_filter);

ProcessMessage process_filter(names);
pipeline.add_filter(process_filter);

PublishMessage output_filter;
pipeline.add_filter(output_filter);

// Run the pipeline
pipeline.run(1);

// Remove all filters from the pipeline
pipeline.clear();

return 0;
}



Vivek Rajagopalan
Total Points:
325
Status Points:
275
Green Belt
November 9, 2009 6:54 PM PST
Rate
 
#1
>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;

//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;

return &_data_msg ;
}

return NULL ;
}
>>>>>>

Your for loop never gets to return NULL.  Everytime the filter visits the token,  your for loop rolls only once, even though you have " i < 10".





Jogi Narain
Total Points:
90
Status Points:
40
Green Belt
November 11, 2009 7:25 AM PST
Rate
 
#2 Reply to #1
Hi Vivek, thanks for replying.

I don't understand the answer... I thought it would only call the filter *once* and the for loop would then run 10 times. Why does this work for the while...?


>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;

//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;

return &_data_msg ;
}

return NULL ;
}
>>>>>>

Your for loop never gets to return NULL.  Everytime the filter visits the token,  your for loop rolls only once, even though you have " i < 10".






Jogi Narain
Total Points:
90
Status Points:
40
Green Belt
November 12, 2009 4:02 AM PST
Rate
 
#3 Reply to #2
Quoting - Jogi Narain
Ok -- think I am getting this... the start of the pipline will be called until it returns NULL. Hence the terminating condition cannot reside in the called function. Hence thats why the while worked it was calling a external resource that was counting down items.

Is my understanding correct?





Intel Software Network Forums Statistics

8449 users have contributed to 31556 threads and 100420 posts to date.
In the past 24 hours, we have 7 new thread(s) 42 new posts(s), and 52 new user(s).

In the past 3 days, the most popular thread for everyone has been gemm(A,A,A) like possible? The most posts were made to TBB on linux segfaulting The post with the most views is Rafael,Does function1(at at

Please welcome our newest member tmkim80