Hey,
I'm trying to get an example running where TBB tasks submit work to a separate "service" thread which then executes it and schedules the continuation of the task for execution by TBB. The submitting task should wait until completion of the continuation, but the worker thread should continue to work on different tasks.
My current approach is to pass the continuation task to the service thread. Upon completion of the work, the service thread will schedule the task for execution using task::enqueue. At some point a worker will pick up this task and when finished it wakes up the submitting task.
Initially I tested this with TBB 4.2, where I get situations where every worker is trying to pointlessly steal tasks when the shared queue is filled with tasks to execute and everything is completely blocked. TBB 4.3 and 4.4 give a very different result: every enqueue operation actually starts a new workerpool (next to the original one) which executes the task and then terminates. Looking at the source code confirms what I'm seeing. It seems that enqueue gets called on the scheduler that created the task which in turn looks in the TLS for a local scheduler. Since there is no local scheduler, one gets created.
I would like to have a way to enqueue on the original workerpool's shared queue and have those workers execute the tasks. Or is there a different way to achieve this behaviour? Maybe I'm overly complicating this.
The current implementation is as follows. It includes some manual ref_count adjustments for the case when the continuation also depends on other tasks, but it can be safely ignored.
tbb::concurrent_bounded_queue< std::pair<int, task *> > q; void async_worker() { std::pair<int, task *> x; while( true ) { q.pop(x); if( x.second == 0 ) break; if( tk.decrement_ref_count() == 0 ) task::enqueue(tk); } } class ContinuationTask : public tbb::task { int i; public: ContinuationTask(int i) : tbb::task(), i(i) {} task *execute() override { /* stuff */ return nullptr; } }; class InitiatorTask : public tbb::task { int i; public: InitiatorTask(int i) : tbb::task(), i(i) {} task *execute() override { task *tk = new(allocate_continuation()) ContinuationTask(i); /* stuff */ tk->set_ref_count(1); q.push( std::make_pair(i,tk) ); return nullptr; } }; void do_async(int i) { task *tk = new(task::allocate_root()) InitiatorTask(i); task::spawn_root_and_wait(*tk); } int main() { std::thread t(async_worker); parallel_for(0, 100, do_async); q.push( std::make_pair(0,nullptr) ); t.join(); return 0; }
Thanks in advance
Tom