I currently use a worker pattern for offloading asynchronous database queries: a number of pthread threads are created, each of which obtains its own connection to the database.
The query-producer derives a query task, allocating memory and populating it with the statement to be executed, and then pushing it onto a std::queue made thread-safe via a mutex. The workers sit behind a phtread_mutex_trylock, until something appears on the queue, which they then grab and execute. Upon completion, they either execute the queue object's "Success" function or they pop it onto a shared "callback" queue, which the master thread periodically drains (thus enabling me to transition what were once monolithic single-threaded server processes gradually towards parallelism). A primary concern is avoiding connection thrashing: I don't want each SQL query to have a connection tear-up/tear-down overhead; currently each worker thread encapsulates and manages its own connection handle. But I could probably use a concurrent queue of handles and have each query task pull one off the queue, only creating a new one when none is available. Are there already existing patterns/templates for this kind of resource-targetted work offloading, without leaving specific threads mutex-locked pending work? e.g.
template
class ParallelResourcePool
{
typedef tbb::concurrent_queue<_ResourceType*> Resources;
Resources m_resources ;
// Derived classes must implement AllocateResource
virtual _ResourceType* AllocateResource() = 0 ;
// Get a resource off the queue or allocate one.
_ResourceType* GetResource()
{
_ResourceType* resource = NULL ;
// Get the first free resource from the queue, if any available.
if ( m_resources.try_pop(resource) == true )
return resource ;
return AllocateResource() ;
}
void ReturnResource(_ResourceType* const resource)
{
m_resources.push(resource) ;
}
// Derived class must describe how to release a resource.
void FreeResource(_ResourceType* const resource) = 0 ;
virtual ~ParallelResourcePool()
{
Resource* resource = NULL ;
while ( m_resources.tryPop(resource) )
{
FreeResource(resource) ;
}
}
} ;
...
class MysqlPool : public ParallelResourcePool
{
virtual MYSQL* AllocateResource()
{
MYSQL* conn = threadsafeMysqlOpen() ;
return conn ;
}
} ;
static MysqlPool s_mysqlConnections ;
// ParallelResourceUser template defined somewhere else; // it provides a base class that receives the resource pool, // obtains the Resource for this invocation, and then calls // the execute function.
class MysqlResourceUser : public ParallelResourceUser
{
MysqlResourceUser(std::string& statement)
: m_statement(statement)
{}
virtual bool Execute() const {
mysql_real_query(Resource(), m_statement.c_str(), m_statement.length()) ;
...
}
} ;
...
int somefunction()
{
std::string statement = "SELECT t.field2, t.field2 FROM {0} AS t WHERE {1} = {2} LIMIT 1" ;
sqlSubstitute(statement, "mytable", "myfield", "'hello'") ;
parallel_work_queue(s_mysqlConnections, new MysqlResourceUser(statement)) ;
}


