enqueue from outside the threadpool

enqueue from outside the threadpool

Hey,

I'm trying to get an example running where TBB tasks submit work to a separate "service" thread which then executes it and schedules the continuation of the task for execution by TBB. The submitting task should wait until completion of the continuation, but the worker thread should continue to work on different tasks.

My current approach is to pass the continuation task to the service thread. Upon completion of the work, the service thread will schedule the task for execution using task::enqueue. At some point a worker will pick up this task and when finished it wakes up the submitting task.

Initially I tested this with TBB 4.2, where I get situations where every worker is trying to pointlessly steal tasks when the shared queue is filled with tasks to execute and everything is completely blocked. TBB 4.3 and 4.4 give a very different result: every enqueue operation actually starts a new workerpool (next to the original one) which executes the task and then terminates. Looking at the source code confirms what I'm seeing. It seems that enqueue gets called on the scheduler that created the task which in turn looks in the TLS for a local scheduler. Since there is no local scheduler, one gets created.

I would like to have a way to enqueue on the original workerpool's shared queue and have those workers execute the tasks. Or is there a different way to achieve this behaviour? Maybe I'm overly complicating this.

The current implementation is as follows. It includes some manual ref_count adjustments for the case when the continuation also depends on other tasks, but it can be safely ignored.

tbb::concurrent_bounded_queue< std::pair<int, task *> > q;

void async_worker() {
  std::pair<int, task *> x;
  while( true ) {
     q.pop(x);
      if( x.second == 0 )
        break;

      if( tk.decrement_ref_count() == 0 )
        task::enqueue(tk);
  }
}

class ContinuationTask : public tbb::task {
  int i;
public:
    ContinuationTask(int i) : tbb::task(), i(i) {}
    task *execute() override {
      /* stuff */
      return nullptr;
    }
};

class InitiatorTask : public tbb::task {
  int i;
public:
    InitiatorTask(int i) : tbb::task(), i(i) {}
    task *execute() override {
      task *tk = new(allocate_continuation()) ContinuationTask(i);
      /* stuff */
      tk->set_ref_count(1);
      q.push( std::make_pair(i,tk) );
      return nullptr;
    }
};

void do_async(int i) {
  task *tk = new(task::allocate_root()) InitiatorTask(i);
  task::spawn_root_and_wait(*tk);
}

int main() {
  std::thread t(async_worker);
  parallel_for(0, 100, do_async);
  q.push( std::make_pair(0,nullptr) );
  t.join();
  return 0;
}

Thanks in advance

Tom

2 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

I would recommend you to look at the flow graph instead of parallel_for and tasks. In recent versions, there is a special async_node class for communicating with an external thread. You may read more at the documentation page: https://www.threadingbuildingblocks.org/docs/help/reference/appendices/c....

So instead of a TBB task submitting a portion of work to the thread and waiting for it to complete, you have three nodes in the graph: a functional node that prepares data for async processing, an async_node that submits these data to a separate thread, and a second functional node that does post-processing/continuation work.

Does it make sense to you?

Leave a Comment

Please sign in to add a comment. Not a member? Join today