multifun_node try_put() several messages to one successor crashes

multifun_node try_put() several messages to one successor crashes

Hi,
I'm connecting a multifun_node to a limiter_node, with the intention to send 0,1, or >1 messages to keep the flow well saturated. It works well with the concurrency threshold, nparqueue = 1, but throws an exception when nparqueue = 2. Please help!
Igor

typedef multifunction_node<Pair, tuple<Pair, continue_msg>, rejecting> multifun_node;
graph g;
multifun_node processor(g, nparqueue, process_interval);
limiter_node<Pair> limiter(g, nparqueue);
make_edge(output_port<0>(processor), limiter);
make_edge(output_port<1>(processor), limiter.decrement);
priority_queue_node<Pair, ComparePairs> priority_buffer(g);
make_edge(limiter, priority_buffer);
make_edge(priority_buffer, processor);
limiter.try_put(PairStart); // This is how one starts it
g.wait_for_all();

void process_interval(...) {
get<1>(out).try_put(continue_msg()); // decrement the limiter accepting next task into the buffer
...
while(WorkList != EmptyList) {
Pair pair = Head(WorkList);
bool res = get<0>(out).try_put(pair);
if(res) DelHead(WorkList);
else break;
}
}

Fails as follows

get<0>(out).try_put(pair)
-> task *res = my_successors.try_put_task(i); (_flow_graph_node_impl.h)
->
else { // failed
if ( (*i)->register_predecessor(*this->my_owner) ) {
if (!upgraded) {
l.upgrade_to_writer();
upgraded = true;
}
i = this->my_successors.erase(i); (_flow_graph_impl.h)
->

iterator erase(const_iterator _Where)
{ // erase element at _Where
#if _ITERATOR_DEBUG_LEVEL == 2
if (_Where._Getcont() != this || _Where._Ptr == this->_Myhead)
_DEBUG_ERROR("list erase iterator outside range"); (<list>)

Using the new TBB 4.2 (tbb42_20130725oss)

4 posts / 0 nouveau(x)
Dernière contribution
Reportez-vous à notre Notice d'optimisation pour plus d'informations sur les choix et l'optimisation des performances dans les produits logiciels Intel.

Hello, Igor,

I am including a sketch of what I see your code looks like.  (I used a queue_node for the WorkList because I think it captures the logic.)  A couple of points:

  • Is the multifunction_node code thread-safe?  One way to test this is to force the node to be serial.  When you set it to 2 there may be a conflict in accessing the WorkList.
  • Because the multifunction_node is both controlling the limiter_node and emitting work for the priority_queue node, is there a reason to have the limiter?  The multifunction_node can just count the number of Pairs it emits (and decrement for each Pair received).  This would only be the case if there is no other source or sink of Pairs than the WorkList (once you injected a Pair to start the graph.)
  • I assume this is a small sketch of the code you actually have.  If there is a function_node that processes Pairs and then forwards something to the multifunction_node, it could just forward a continue_msg rather than a Pair, since you drop the pair in the multifunction_node and don’t use the value.  That would also make the code conceptually more in-line with what seems to be going on; the Pair received by the multifunction_node just indicates a Pair has been processed.

If making the multifunction_node serial does not help, please let me know.

Regards,
Chris

Chris, thank you for your response. Are you saying that the problem is NOT due to TBB's try_put(pair) logic, but that I must have corrupted memory by not ensuring thread-safety of surrounding code?! Indeed, the code works when serial, that is I believe I get setting the concurrency threshold for multifun_node (which I call "nparqueue") to 1.

WorkList is a linked list of Pairs ordered according to their "priority", it's a priority queue. The top Pair is checked and new Pairs may be added back to the list. Moreover the priority of tail Pairs may fall below the constantly improved threshold priority, ("fmax"), are are regularly chopped from the WorkList to keep it compact and memory efficient.

I have a version with a global WorkList guarded by a write mutex. But here I'm testing the "lock-free" idea of having multiple WorkLists, one per thread via TPS. This assumes that the threads executing the multifun_node are the same, that is, the thread pool size equals the multifun_node's concurrency limit.

Each multifun_node contributes a few top Pairs from their local WorkList to a global priority queue managed by priority_queue_node (or could be just a buffer node); and takes new Pairs to work on from that node.

multifun_node's body is "process_interval",

typedef std::vector<Pair> Pairs;
typedef tbb::enumerable_thread_specific< Pairs > TLSvector;
typedef tbb::enumerable_thread_specific< PairPtr > TLSPairPtr;
        TLSvector results; // per thread
        TLSPairPtr work_lists(PairPtr(EmptyList)); // per thread
        auto process_interval = [&f,&fmax,&results,&work_lists](Pair PairY, multifun_node::output_ports_type &out) {
            ::process_interval(PairY, f, work_lists, results, fmax, out);
        };

...The actual function is external (can't run the debugger inside lambda functions),

inline void process_interval(const Pair &PairY, HTscalar_FctPtr f, TLSPairPtr &work_lists, TLSvector &results, real &fmax, multifun_node::output_ports_type &out) {
    get<1>(out).try_put(continue_msg()); // decrement the limiter accepting next task into the buffer
    // Bisect first and check the two sub-boxes
    if(fmax < GetFyi(PairY)) return;
    ivector Y = GetInt(PairY);
    ivector Y2 = Y;
    rvector c = mid(Y);
    int const k = MaxDiamComp(Y);
    SetSup(Y[k],c[k]); SetInf(Y2[k],c[k]);
    auto &WorkList = work_lists.local(); // private queue ("parqueue")        
    check_add( Y, f, WorkList, results, fmax);
    check_add(Y2, f, WorkList, results, fmax);
    MultiDelete(WorkList,fmax);
    while(WorkList != EmptyList) {
        Pair pair = Head(WorkList);
        bool res = get<0>(out).try_put(pair);
        if(res) DelHead(WorkList);
        else break;
    }
}

Laisser un commentaire

Veuillez ouvrir une session pour ajouter un commentaire. Pas encore membre ? Rejoignez-nous dès aujourd’hui