Worker thread async task offloading?

Worker thread async task offloading?

kfsone's picture
I've been looking at several parallelism/threading solutions for optimizing our server processes to take better advantage of the hardware they're running on. Currently most of our threading is for offloading blocking actions such as database queries. After reading through the TBB PDFs and buying the book (which seemed to be a copy of the PDFs), I'm left feeling that TBB suffers the same issues that OpenMP would have for me, that it really wants you to focus on iterative work. I'm trying to figure how to apply TBB to one particular issue that is archetypal of many of the off-loadable work assignments our system generates, which is dispatching queries to databases in the background. The reason this seems ill-suited to TBB - and this may purely be my lack of understanding - is that these queries are generated relatively sparsely, can individually take a long time waiting on IO, etc.

I currently use a worker pattern for offloading asynchronous database queries: a number of pthread threads are created, each of which obtains its own connection to the database.


The query-producer derives a query task, allocating memory and populating it with the statement to be executed, and then pushing it onto a std::queue made thread-safe via a mutex. The workers sit behind a phtread_mutex_trylock, until something appears on the queue, which they then grab and execute. Upon completion, they either execute the queue object's "Success" function or they pop it onto a shared "callback" queue, which the master thread periodically drains (thus enabling me to transition what were once monolithic single-threaded server processes gradually towards parallelism). A primary concern is avoiding connection thrashing: I don't want each SQL query to have a connection tear-up/tear-down overhead; currently each worker thread encapsulates and manages its own connection handle. But I could probably use a concurrent queue of handles and have each query task pull one off the queue, only creating a new one when none is available. Are there already existing patterns/templates for this kind of resource-targetted work offloading, without leaving specific threads mutex-locked pending work? e.g.
template
class ParallelResourcePool
{
 typedef tbb::concurrent_queue<_ResourceType*> Resources;
 Resources m_resources ;

 // Derived classes must implement AllocateResource
 virtual _ResourceType* AllocateResource() = 0 ;

 // Get a resource off the queue or allocate one.
 _ResourceType* GetResource()
 {
   _ResourceType* resource = NULL ;
  // Get the first free resource from the queue, if any available.
  if ( m_resources.try_pop(resource) == true )
   return resource ;
  return AllocateResource() ;
 }

 void ReturnResource(_ResourceType* const resource)
 {
  m_resources.push(resource) ;
 }

 // Derived class must describe how to release a resource.
 void FreeResource(_ResourceType* const resource) = 0 ;

 virtual ~ParallelResourcePool()
 {
  Resource* resource = NULL ;
  while ( m_resources.tryPop(resource) )
  {
    FreeResource(resource) ;
  }
 }
} ;

...

class MysqlPool : public ParallelResourcePool
{
 virtual MYSQL* AllocateResource()
 {
  MYSQL* conn = threadsafeMysqlOpen() ;
  return conn ;
 }
} ;

static MysqlPool s_mysqlConnections ;

// ParallelResourceUser template defined somewhere else;
// it provides a base class that receives the resource pool,
// obtains the Resource for this invocation, and then calls
// the execute function.
class MysqlResourceUser : public ParallelResourceUser
{
 MysqlResourceUser(std::string& statement)
  : m_statement(statement)
  {}

 virtual bool Execute() const {
  mysql_real_query(Resource(), m_statement.c_str(), m_statement.length()) ;
  ...
 }
} ;

...

int somefunction()
{
 std::string statement = "SELECT t.field2, t.field2 FROM {0} AS t WHERE {1} = {2} LIMIT 1" ;
 sqlSubstitute(statement, "mytable", "myfield", "'hello'") ;
 parallel_work_queue(s_mysqlConnections, new MysqlResourceUser(statement)) ;
}
Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
21 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.
kfsone's picture

Oops, added back the preface describing what I'm looking for :) Teach me to edit posts in two windows.

Also - the source code posted was written on the fly as an attempt to express, in code, what I'm trying to achieve. It may be the wrong pattern, and should be considered pseduo code. Ultimately, what I'm trying to achieve is the equivalent of
#pragma omp parallel(but don't spinlock on futexes for pitys sake)
{
  #pragma omp master
  {
    /* stuff */
  }
}

void functionCalledFromSomewhereUnderTheAboveCode()
{
  #pragma omp task
  databaseQuery(...) ; // Let another thread pick it up
}

Without the OpenMP overhead that comes from having idle worker threads... Right now my understanding leaves me to believe I might be best leaving those as pthread tasks, which is disappointing because I was hoping to use TBB to eliminate platform dependencies.

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

Found the answer by reading through "What's New"s instead of Documentation...


http://software.intel.com/en-us/blogs/2009/08/05/transitioning-from-intel-tbb-21-to-22/

ISO C++ thread class A thin portable wrapper around OS threads. It's a close approximation of the ISO C++ 200x class thread (Section 30.2 ofhttp://www.open-std.org/jtc1/sc22/wg21/docs/papers/2008/n2691.pdf). Now TBB lets you choose which is best, task-based versus thread-based, for your situation. Threads are typically better than tasks when the "work" is really more waiting than computation, such as for:

  • GUI, I/O or network interface threads.
  • Threads that need to wait on external events.
  • Programs that previously needed to use both native threads and Intel TBB tasks.

That's what I'm looking for. I think that moves TBB closer into line with the kind of resource-centered worker-pool systems I have in mind (MySQL offloading was just one example). So I'll probably try and write my encapsulation generically so that I can contribute it back to TBB.
- Oliver
Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
Raf Schietekat's picture

"That's what I'm looking for."
Is that tbb_thread you're talking about? It may be a thin wrapper, but there's still a big fat thread underneath (expensive to create and operate). If I had a lot of things to wait for, I would try to consolidate them in a tbb_thread created for that purpose (doing select() or poll() or whatever else is relevant). A task that has to wait for something would create a child task with the relevant information, "spawn" it to the specialised thread (outside of TBB so not actually spawning, just carrying information so not actually executing at any point), and when the information is obtained the specialised thread would destroy the task so that its parent will return from wait_for_all() if blocking, or be spawned the TBB way if used as a continuation (which seems somehow preferable when waiting is involved).

Alexey Kukanov (Intel)'s picture
Best Reply

A primary concern is avoiding connection thrashing: I don't want each SQL query to have a connection tear-up/tear-down overhead; currently each worker thread encapsulates and manages its own connection handle.
But I could probably use a concurrent queue of handles and have each query task pull one off the queue, only creating a new one when none is available.

In TBB, you could use enumerable_thread_specific template class to have a connection handle per thread, pretty conveniently.

We develop better support for workoffloading in the next TBB version (and I could provide you some details if you are interested) butstill TBB is unable to compensate for "loss" of worker threads blocked on IOcalls. So maybe using tbb_thread to build your own thread pool is better solution for you at the moment.

On the other hand, if you have an idea about the (average) ratio between working time and waiting time for your threads, you could choose to use TBB scheduler butoversubscribe the machine to the degree defined by that ratio. E.g. if your threads in average spend 3/4 of time doing some work, and 1/4 waiting for DB response, then in theory you would get average system load closer to 100% by multiplying the default number of threads to 4/3, though there would be periods of oversubscription (with some ready threads waiting for a time slot) and undersubscription (with some cores sitting idle). However, any thread pool implementation would have some degree of over- or undersubscription, in my opinion.

kfsone's picture

If there isn't an existing solution, and there is interest in extending such capabilities within TBB, I'll flesh out the rough outline I penned in the original post to a working baseline. What I had in mind is that, with TBB, you could avoid idle worker threads by having a single worker responsible for both pools (where work is assigned to resources by the worker rather than the caller, such as offloading reads/writes to a database) and pumps (where work is assigned to specific resources, such as offloading reads/writes to a socket). By combining both types of workers, the typical server-type application will find it easier to integrate TBB generally, and can avoid the overhead of dual-thread modelling. The primary aim with such an abstraction is usually the async aspect, and not so much performance of the offloaded tasks: You want to retain as much available CPU for actual workload execution. So you probably don't generally want lots of idle threads sitting around waiting; but rather a very small number (e.g. 1) of threads sitting around monitoring overall supply of work. And perhaps having such a model might help determine a good way to integrate this into TBB/TBB's scheduler in the longer term future. Ever since meeting some of the TBB team at the Austin Game Developer's Conference a few years ago, I've been waiting for some announcement that the next line of Intel CPUs has on-chip support for TBB :) As we get more and more cores, it only makes sense that there be a bridge to the [otherwise] expensive software management of self-distribution. Strong, generic support for this particular subset of parallelization might help push that: content servers would benefit dramatically from it. Consider a software VPN server: it really needs most of it's CPU power for encryption/decryption. The Xeon (server) variants of the i7 CPUs are crying out for (n^2)+1 cores instead of just n^2, with the 9th [master] core running serial/management/IO tasks. 8 cores running "hot" code such as the encrypt/decrypt routines, routing etc, and the 9th core trading packets in/out and managing the other cores. If my schedule allows, I'll try and get a working base done over the weekend.

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
Andrey Marochko (Intel)'s picture

Let me add some more background info on the problem at hands. As you correctly surmised in the very beginning, the main focus of TBB is (at least so far) on computationally intensive workloads (not necessarily iterative ones, any CPU bound calculations fit well). Your case involves code doing blocking operations, and handling it well is indeed a challenge. As Alexey noted above when a your code blocks (doing IO operation, or waiting on the kernel mode synchronization object), the thread gets essentially lost for TBB until OS resumes it.

The workaround with mandatory oversubscription suggested by Alexey is indeed the only way to tackle the issue available at the moment. And some of the TBB based appliactions successfully used it. Unfortunately it suffers from several drawbacks that limit its efficiency applicability area.

First for many real world applications (e.g. servers of various kind) the ratio between the time spent waiting and actively working may vary significantly with time. As the result the following negative effects may happen. Let's cosider Alexey's example where we initialize the scheduler with 1/3 of extra threads. If at some moment the time spent in the waiting mode drops from 1/4 to 1/8, you'll get ~20% oversubscription. If on the other hand the blocking time increases from 1/4 to 1/2, you'll get 33% underutilization.

While some degree of oversubscription can be tolerable, underutilization is definitely mauch more harmful. Thus finding appriopriate balance can be tricky in cases when the workload behavior varies in a wide range.

Another issue with this approach is a lack of composability (again, at least so far). If you write a reusable component that relies on this kind of mandatory oversubscription, it may fail to get the necessary amount of threads if the scheduler has already been initialized with other number of threads to the moment the component starts its work.

More robust solution for the blocking operations problem was dabated for quite a long time inside the TBB team, but the limitations of different alterantives held off coming to the consensus. Such a solution first of all requires notifications about when the user code blocks and unblocks.

Requirement (1) ideally needs support from OS, as it is only OS that exactly knows if and when a thread blocks/unblocks. To the moment the only OS providing such a mechanism is recently released 64-bit Win7 (UMS threads). And this is too little for a crossplatform library like TBB.

The only viable alternative to OS support is providing a markup API that would be used by TBB users to delimit operatioons that may block. In response to these calls TBB would dynamically increase internal thread pool size when some
thread blocks, and then relinquish extra thread when the original one
unblocks.

However solution based on such an API is not free from limitations. First of all it is additional work for the programmers requiring extra diligence in placing the notifications (may be challenging or vexing for large apps). Another inherent shortcoming of the approach is its unreliability, in the sense that potentially blocking operations not always block. For example when data for IO operation are (pre)cached by OS, or when synchronization operation succeeds immediately. In such cases changing thread pool state may introduce significant overhead. To fight this issue we'd need to introduce some timeouts for new threads activation that will decrease the hardware utilization to some extent.

Another problem is what to do with a replacemnt thread when the original one unblocks. The issue here is that the new thread may grab a large piece of work, and especially in case of nested parallelism (a frequent usage model for TBB apps) it will not be able to stop until it finishes it all. This obviously leads to the oversubscription that may reach a significant level in soem cases.

To mitigate this new problem we could block the original thread in its unblock notification until the number of active threads drops enough. This would cost us additional synchronization, and what is worse can even result in a deadlock if the blocking operation with surrounding notifications is done from under the lock.

Thus as you can see there are a number of pitfalls on the way of introducing blocking operations support in TBB (let alone the cost of extra synchronization that may have to be added into the TBB task scheduler).

Anyway it would be interesting to hear you opinion on whether the manual notification based approach (with all its limitations) appears to be useful to you?

Raf Schietekat's picture

"The workaround with mandatory oversubscription suggested by Alexey is indeed the only way to tackle the issue available at the moment."
Can you explain what you think is wrong with my suggestion?

Andrey Marochko (Intel)'s picture

Ahh, your approach should work of course.

When I said "the only way" I meant the solution using standard TBB workers pool. In your variant the user would need to manually implement generic thread pool infrastructure, which is a nuisance at the very least :) . Besides TBB does not provide complete abstraction for platform specific synchronization mechanisms (like events and waiting functions on Windows, and poll/epoll on Nixes).

As an additional note, instead of passing TBB task to the custom pool thread, it would be better to pass a reference to the waiting task, and directly decrement its refcount upon blocking operation completion. Doing so will save you overhead of a task creation/destruction, and what is much more important will prevent automatic initialization of the TBB task scheduler object in the custom pool thread when task::destroy() method is called. And the scheduler initialization is not the fastest thing in the world :)

kfsone's picture

I didn't get opportunity this weekend to work on my concept classes, hopefully I'll have chance this week/weekend; I think they may help as a discussion point.

It was precisely the reasons you've outlined that lead me to this initial conclusion/post :) Obviously the problem doesn't go away just because TBB is not used, it just becomes more opaque: the TBB scheduler makes assumptions about the workload being done and assigns work, but then fails to get the intended parallelism because of native thread scheduling occuring in other parts of the application.

Honestly - until hardware engineers can be coaxed into bridging this gap with us we are going to have to work around the limitations imposed by software spending CPU cycles guessing what hardware to use :)

In most of my experience (I've been a "hobbyist paralleist" since I was 14 in 1985 when I developed my own multi-tasking OS in 680x0 assembler) async workers are very low priorty.

For the purpose of this thread, I am primarily considering deferable tasks that you simply wish not to delay the main thread. I think it would be perfectly acceptable to exclude these from nested parallelism. My aim is to initiate fresh migration to TBB.

The most common parallelism tasks for non-cpu intense work are things like DNS queries, file operations, database operations, network I/O, I/O-to-hardware offloading and reflection (self-monitoring, auditing, logging).

Such systems tend to lack the quantities of cpu-intense work to demand a full allocation of modern-cpu cores. Consider a web-server that will be delivering encrypted and/or compressed pages.

This could be quite well suited to OpenMP 3.0 sections and tasking to parallelize the polling of network I/O and handling the tasking of responses (accept new connection, close connections that raised errors, drain writes to connections ready for more data, process completed reads).

It will generate some eminently parallizable workloads when it encrypts and compresses a 500Kb response. But it is unlikely to want to do so at the cost of all other activity.

Additionally, in the case of "server" systems, one only has to glance at the list of "server" offerings from vendors like Dell or Newegg to see that "server" most frequently equals "multi-cpu" (all of our servers have two cpus with multiple cores each). If there are 16 CPU cores of which 14 are in use, and TBB needs 2 cores for threads, does it know/need to pick two on the same CPU?

Anyway - to the matter of aiding such applications transition to TBB, to consolidate where they get their threading, I think some markup might be useful, but I think that should be done up-front. It will simplify the considerations you and the developer need to give. The worker pool concept can then be generalized, but the operation of the pools will be singular for a given application.

I can see two scenarios for this usage:

1. Deferal workers: Async work is queued as needed, but is then processed on demand:

int main()
{
  initialize() ;

  // Manual scheduling of worker activities.
  tbb::set_worker_deferal(true) ;
  // Raise an error when a worker has not exited
  // after two calls to schedule_workers.
  tbb::set_worker_max_cycles(2) ;

  while ( running )
  {
    application_work() ;
    // Allow 2.5ms for worker execution; after this
    // time, schedule_workers returns 'false' if any
    // workers have not terminated, or 'true' if
    // there is 0 work left outstanding and zero
    // workers still active. It may return sooner.
    tbb:schedule_workers(2500) ;
   }
}

perhaps the user might specify a maximum time for the workers to run.

Pro: The deferal work does not interfere with other operations, and any non-intensive work done during the "application work" phase will not interfere with other operating system activity etc: I've come across a lot of intensive applications which have to

#if defined(WIN32) || defined(WIN64)
  Sleep(0);
#else
  pthread_yield();
#endif

to let the OS do its thing and prevent the application stalling out the machine. (Particularly MacOS and Windows)

Pro: Deferred work actions benefit from cache hotness by running simultaneously.

Cons: Deferred work may take significantly longer to be executed.

Cons: Worker threads that take too long may block.

Options: Rather than a monolithic operator(), a very simple FSM could be used that allows the worker to return either tbb::Worker::Completed vs tbb::Worker::Yielding. Yielding threads would be re-queued and then an additional operator invoked on subsequent calls to schedule_workers to see if they need to resume yet -- as with the case of handing a single, huge amount of write data to a worker, which will then perform non-blocking IO operations, yield when the IO buffers are full, and check to see if the IO buffers have empited sufficient for another write...

2. Parallel Workers: Async work is handled on dedicated threads using mutex lock/signal to dispatch work to those threads.

struct MySQLUpdate : public tbb::PoolWorker
{
  MySQLUpdate(const std::string& srcQuery) : query(srcQuery) {}
  std::string query ;

  // Takes a "Split" to indicate this is executed in parallel
  virtual tbb::PoolWorker::ReturnType operator()(tbb::Split&) const
  {
    mysql_real_query(GetResource(), query.c_str(), query.length()) ;
    if ( got_mysql_error )
      return tbb::PoolWorker::Completed ;
    // Return one of Yielding, Completed or Join.
    return tbb::PoolWorker::Join ;
  }

  // Takes a dummy "Join" to indicate this is to be executed by the
  // master thread when tbb::consume_worker_results() is called.
  virtual void operator()(tbb::Join&) const
  {
    update_next_hearbeat_time() ;
  }
} ;

...

   tbb::set_worker_deferral(false) ;

   // Crude "delegate workers to the Nth cpu". But obviously,
   // you would probably want to check the CPUs had more than
   // one core too :)
   uint32_t cpus = tbb::get_num_cpus() ;
   if ( cpus == 1 )
     log("CAUTION: Only one CPU detected, performance may be degradedn") ;
   tbb::set_worker_cpu_affinity(cpus) ;

   while ( running )
   {
     application_work() ;
     // 0 to indicate we want all worker results consumed;
     // otherwise a uSecond value for maximum time to
     // spend processing Joined results.
     tbb::consume_worker_results(0) ;
   }

...

   tbb::parallel_work_queue(mysqlConnectionResources
                                   , new MySQLUpdate("update server_heartbeat set last_beat = NOW()") ;

(Using something like the class/struct prototypes in my first post)

These two models ought to cover a huge swatch of cases where people are currently using alternative threading packages; it ought to fairly closely match most of the models they use to ease transition, and it should create a workable base on which to build and integrate future models where workers can become more computationally intesive by drawing in new users to TBB to comment and feedback on how their requirements.

I think part of the reason the debates you mentioned came to no workable conclusion is that developers are looking to you for guidance on how they should parallelize. What they previously knew about parallelization has changed with modern CPU architecture, so they stick with what they already know (the self-same problem in getting single-threaded developers to transition to paralleism in the first place :).

Also, a key element here that you may have overlooked is this: You don't have to worry about the efficiency of these workers or their processing. Elegance and ease-of use are a higher order of priority. It may even think of these as "laborers" or "interns" instead of workers; "workers" tends to conjur up the idea of highly efficient worker ants :)

As a primarily network/systems/server developer, I envy my desktop peers for all the CPU extensions they've gotten. All that multiple cores has done for me is add to my personal workload due to the absence of hardware support for work-distribution. 25 years since I wrote my first task scheduler, and here we sit still having to design algorithms to ask the CPUs to please execute their code in their own preferred manner! :)

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

Holy cow: 2653 views of this thread: I take it there is some interest in this topic :)

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

Regarding "Options: Rather than a monolithic operator(), a very simple FSM could be
used that allows the worker to return either tbb::Worker::Completed vs
tbb::Worker::Yielding."

That was pretty much "finger in the air" thinking out loud. Obviously there might be better ways of doing it such as having a worker return NULL for "I'm finished" or a function object if it wants to pick up at a different code point.

- Oliver

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
Alexey Kukanov (Intel)'s picture

Oliver, thanks a lot for the ideas you wrote down. I am not yet ready to comment on it - need some time to think; but I wanted to express my appreciation to you for this post :) Definitely this is useful input to consider.

jimdempseyatthecove's picture

Oliver,

The following is an outline of a low overhead code that might fill your needs.

Determine a worst case upper limit on the number of connections you want to permit your application to make. Your application will not necessarily make this number of connections. This number of connections may or may not relate to the number of auxiliary threads outside the TBB thread pool you allocate to process queries (which may vary depending on if queries are synchronous or can be asynchronous).

Once the upper limit on number of connections has been determined you allocate (or pre-size declare) two arrays of pointers to query objects (nullify pointers). (allocate a 3rd array when asynchronous requests can be made).

One table will hold pointers to available query objects, the other table contains table of queued queries. (3rd table when asynchronous requests can be made contain pending query pointers).

You maintain a count of the number of allocated query objects.

Initially you may decide to pre-connect 0-n connections. Connection allocation is performed by allocating the next sequential query object your design (query objects numbered 0-n), make connection, then place pointer to object into array of available query objects[n], n being the query object number.

A TBB thread wishing to make a query searches the available query objects table for non-NULL pointer, when found an XCHG of NULL with pointer is made, if non-NULL pointer obtained, then the TBB thread owns the query object. The remaining work for the TBB thread is to fill in the query object and to insert the object pointerinto the pending query table at index [n] (n == sequence number of query object). After insertion of pointer an atomic increment of a variable maintaining the count of pending queries is made and if resulting count is less than number of auxiliary threads performing queries then an event is made to wake-up the sleeping thread. If NULL is obtained from the available query objects table then race condition occurred with other thread and new table search is made. Should the table search find no available query objects then a query object is allocated, filled in, and linked to a separate list of overflow pending queries. The information placed into the query object will contain the action to be performed upon completion of query (e.g. insertion of results into queue).

The auxiliary threads processing the queries loop on checking overflow pending queries, if not NULL, then a decision is made to if to expand the number of query connections or not to expand the number of query connections. If expansion is recommended then thread sets expansion pending (to inhibit overexpansion) and makes additional connection(s) and potentially increasing query thread pool size. As expansion occurs the new available query nodes are added to the table of available query nodes, then finally releasing the expansion pending flag.

When overflow pending queries is NULL (or expansion in progress is detected), the query thread searches the pending query table for non-NULL entry, if found, it performs an XCHG with NULL, if non-NULL pointer returned then the query thread owns the query node and the query is made.

Should the SQL system permit asynchronous queries then the design would contain a 3rd table of query node pointers to hold query made but pending nodes. The 3rd table would receive the query node pointer. Should the query thread find an empty table then it decrements the count of active query threads then suspends itself.

When the query completes (either synchronously or asynchronously) then the results (or failure) is passed back to the TBB worker threads by way of linked list, table or FIFO queue.

The design goal is to have low inter-thread communication overhead, and manageable number of connections and non-blocking operation for TBB threads. Excepting when query threads need to be awoken, the TBB thread overhead becomes search a table + a single XCHG (which is interlocked) + time to fill query node + 1 write of pointer into pending query table.

A lot of the coding details remain to be filled in, but this should get you started.

Jim Dempsey

www.quickthreadprogramming.com
kfsone's picture

Hi Jim, That sounds a lot like the rough-outlines in the pseudo-code examples I posted above. I'm also focusing on an abstraction that doesn't deal, specifically, with SQL etc, so that I can contribute it to the TBB community. If you take a look at the very first pseudo-code fragment, you'll see the base classes provide a rudimentary, abstracted resource management. But it will be server-centric; that is, it will assume you intend to reserve some number of cores/threads specifically for off-loading assignments, and perhaps periodically joining the work force on the main thread if there is a backlog. "tbb::concurrent_queue" and "tbb::concurrent_vector" seem like reasonable stores for requests, although for the purposes of creating a "Request For Comments" build, I don't think the actual container type matters too much. I've just finished an 18 hour crunch-shift, so I doubt I'm going to be working on itthisweekend either :) - Oliver

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

Sorry for neglecting this for so long; I'm finally done with multiple rebuilds of my home workstation trying to find a comfortable Windows/Linux development pair compromise. I've been helping another forum reader with a parallelization project, and am just putting together a video how-to for integrating some simple TBB basics into his application. I immediately ran into issues getting a simple TBB app to compile under Windows, which I figure is probably down to the various order I've compiled things - basically it's trying to load 64 bit DLLs in a 32 bit app. So I'm just reinstalling VS/ICC/etc. I tried looking into TBB 3.0 in my Linux VM, but the version-by-version naming convention you use gets to be a real headache so I decided to stick with the compiler-installed variants for now, and to figure out this windows problem first. Hopefully once we have our current crunch phase over, I'll get around to implementing the bits and pieces of this concept I've already thrown together. I'll post more once I've had chance to do that.

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
Alexey Kukanov (Intel)'s picture

>... the version-by-version naming convention you use gets to be a real headache ...

Could you please elaborate what is the problem(s)?

kfsone's picture
Oh, when I say "headache", I mean "minor inconvenience that I need to resolve" :)

Just that, traditionally, library names themselves are versioned (as in libc.so.5.0.1). With TBB, in addition to this, andarchitecture specific naming,you have to ascertain which TBB include and library folders you will be using.It makes, for instance, writing a CMake or Autotools "find TBB" a nuisance. As someone who works with a lot of branches of different 3rd party libraries etc, I appreciate the possible benefit of being able to point a particular branch to a specific instance of a set of headers/libraries. On the other hand, it suggests a need to be wary of significant compatability changes from version to version even between minor revision number changes. In particular, it means that I have to ensure developerX has the right version numbers on their machine. And that makes me nervous. I'm happier to work in a setting where $(prefix)/lib/tbb and $(prefix)/include/tbb will have The Right Version than having to ensure $(prefix)/lib has tbb_3.0.1.22. Obviously, we can fix that by setting up our own symlinks, but that means adding a step to our processes for ensuring that $(prefix)/include/tbb is pointing to the right tbb when we install a new one :) Finally, I've observed that many programmers put on their "hard hat" when they see a 3rd party product with versioned install folder names, and it can be an impediment to getting them to embrace a new option when they can't just say "-I/usr/local/include" or "-I/opt/intel/tbb/include -L/opt/intel/tbb/lib". Always found it kind of amusing that the masters of the "if" statement get uncomfortable when they ask a question like "where is the header file" and the answer is along the lines of "well, if you are using version ..." :) - Oliver

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

So after a long time trying to determine what vector to attack this issue on, someone pointed me in the direction of a little known, cross-platform, multi-purpose API called "ZeroMQ" or 0MQ.
ZeroMQ takes the message-passing component of Erlang and presents it to a host of languages in a Berkeley Sockets API, capable of performing zero-copy messaging. Rather than one socket suits all, they have a small set of socket patterns where each is highly tuned to fulfilling a single role, allowing them to provide in-process (inter-thread) communications without locking. But the same ZMQ API also seamlessly provides inter-process, inter-machine and multicast communications with absolute minimal changes (primarily just changing string you use to describe the connection). The socket models are: client (aka 'REQ'; can send, then recv, then send ... etc, but only in that sequence) server (aka 'REP'; can recv, then send, then recv ... etc, but only in the sequence) downstream (can only send) upstream (can only recv) pair (bi-directional) publish (send only) subscribe (recv only, but able to specify what messages you will receive with setsockopt, just like multicast). ZMQ also differs from plain-old-sockets in that when multiple threads/processes listen to a particular socket, ZMQ handles message diffusion for you: so multiple threads can listen to a 'server' socket and ZMQ will deliver the message to one listener with fair scheduling. So you can create a pool of worker threads that simply call recv() and block in a highly OS-friendly manner whether they are threads or processes. They also provide scalability modules, such as zmq_forwarder, that allow you to have multiple listeners across multiple machines listening to a single forward-facing socket. There's also a poll function to allow for interoperability with regular file/socket io. Throughput is pretty phenomenal. For Parallelism, it isn't necessarily cutting-edge performance that you'd get from TBB, but for the subject of this thread - async task offloading - it provides a veryefficient, simple, portable and robust platform for creating scalable parallelism withouthaving to turn to Erlang. After dabbling with it for a while, I created the "Async::Worker" concept for it that I discussed creating here. http://www.kfs.org/oliver/async/ And here's an example that TBB users will find relatively familiar:

// ZeroMQ / Async::Worker example.
// Oliver 'kfs1' Smith 
// Uses clock_gettime which, under linux, requires -lrt

// Because the example contains a lot of additional
// code (for benchmarking) I have highlighted the key
// sections of the main() function with lines of ///s.

#include "async-worker.h"       // The Asyc:: classes.
#include                // For std::vector.
#include               // For printf
#include              // For rand
#include                // For sin, cos, etc
#include                // Timing functions.
#include              // For uint64_t.


typedef uint64_t Number ;

typedef std::vector< Number > Numbers ;



class CrunchNumbersRange : public Async::RunAndReturn
{

public: CrunchNumbersRange(const Numbers::iterator& start, const Numbers::iterator& end, Number* finalDestination)
            : m_start(start)
            , m_end(end)
            , m_sum(0)
            , m_finalDestination(finalDestination)
            {}

public: virtual void Work() const
        {
            Numbers::iterator it ;
            for ( it = m_start ; it != m_end ; ++it )
            {
                m_sum += *it ;
            }
        }

public: virtual void Result()
    {
        // Add our calculated value to the accumulator.
        *m_finalDestination += m_sum ;
    }

private: Numbers::iterator m_start, m_end ;
private: mutable Number m_sum ;
private: Number* m_finalDestination ;

} ;


    int main(int argc, const char* const argv[])
    {
        static const size_t NumberOfElements = 20000000 ;
        static const size_t GroupSize = 8192 ;
        Numbers numbers ;
        numbers.resize(NumberOfElements) ;
        for ( size_t i = 0 ; i < NumberOfElements ; ++i )
        {
            numbers[i] = (rand() & 65535) + 1 ;
        }

        uint64_t parallelResult = 0 ;

        // Dispatch groups of numbers to workers.
        Numbers::iterator it = numbers.begin() ;
        do
        {
            Numbers::iterator end = std::min(it + GroupSize, numbers.end()) ;
            Async::Queue(new CrunchNumbersRange(it, end, &parallelResult)) ;
            it = end ;
        }
        while ( it != numbers.end() ) ;

        // Wait for all the results, calling Result() on each
        // returned object to produce a total.
        Async::GetResults() ;

        printf("Done. Calculated sum as %lu.n", (unsigned long int)parallelResult) ;

        return 0 ;
    }
Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

In essence: The example is a mixture of #pragma omp task and TBBs blocked-range processing concept. The 'Result()' function is executed in the context of the 'master' thread, allowing it to safely perform reductions as in the example. Async::Worker is, though, mostly for "FireAndForget" dispatching of workloads that can be executed by a worker and discarded with no need for an acknowledgement. Again: I don't think it compares performance-wise with TBB for bleeding edge maximum-work-per-cpu-cycle-spent, but the Erlang-like ease with which you can make the same code distributed makes it really well suited for these kinds of "I'm busy, can someone else write this information to a log file for me?" workloads.

////// Version one: just connect to a local worker.
zmq::socket_t outSocket(zmqContext, ZMQ_REQ) ;
outSocket.connect("inproc://my-name-for-this-socket") ;

// Send a message directly from my data with zero copies.
zmq::message_t msg(myData, sizeof(myData)) ;
outSocket.send(msg, 0) ;


////// Version two: connect instead to a forwarder that may forward messages
////// across processes or machines.
zmq::socket_t outSocket(zmqContext, ZMQ_REQ) ;
outSocket.connect("tcp://192.168.0.5:2342") ;

// Send a message directly from my data with zero copies.
zmq::message_t msg(myData, sizeof(myData)) ;
outSocket.send(msg, 0) ;
Now -- obviously going to the forwarder means that there is extra overhead for any work that winds up staying on the local machine, but ZMQ is pretty well tuned so it's a really slim overhead, and the advantage of extra distribution pays for it the same way that marshalling data to dispatch to multiple cores within a single machine pays for itself once you have additional CPUs crunching the data for you.

Correction: They provide zmq_queue (for transparent client/server muxing/load balancing/scaling), zmq_forwarder is for the multicast/pub-sub pattern allowing you to offload the filtering/broadcasting of messages (i.e. you can achieve multicast over TCP by connecting to a forwarder) and zmq_streamer for transparent distribution of work between local or remote workers with smart, fair scheduling.

This gives you the message-passing parallel-scalability of Erlang in the language - or languages - of your choice.

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe
kfsone's picture

Actually, after building ZeroMQ with Intel's C++ compiler, the performance is comparable to that of TBB when passing work to threads via an "inproc://" URL. I really hope some of the TBB devs get chance to look at ZeroMQ. It is simply a message-passing API. TBB's classes already implement conceptual message passing, they just do it through the more accepted methods of inter-thread communication of mutexes/futexes and lock-based resources. ZeroMQ doesn't provide the rich set of parallelism constructs that TBB does. However, if you combine TBB and ZeroMQ what you get the potential for performance andscalability. Consider the following piece of code:

zmq::context_t zmqContext(1) ; // Allocate 1 IO thread.
zmq::socket_t  socket(ZMQ_DOWNSTREAM) ; // We're sending data downstream to workers.

// Create a low-overhead, zero-copy, lock-free
// connection to local worker threads.
socket.bind("inproc://thread-workers") ;

// But also accept connections from remote machines
// that want to do work for us - on the same zmq endpoint.
socket.bind("tcp://0.0.0.0:12345") ;

//// BEGIN PSEUDO CODE ////
// Divide our work into chunks.
for ( chunk_iterator it = work.begin() ; it != work.end() ; ++it )
{
  zmq::message_t workMessage(it) ;
  socket.send(&workMessage) ;
}
//// END PSEUDO CODE ////

Despite the socket-like API, the dispatch of work to worker threads is actually extremely efficient. Obviously you'd need some really hefty workloads to justify sending work out over the Internet. But if your remote worker connections were over InfiniBand...

Oliver 'kfs1' Smith, Lead Server Programmer, Cornered Rat Software / Battleground Europe

Login to leave a comment.