Custom havok thread pool for multithreading

Custom havok thread pool for multithreading

Hello,
I am currently using Havok 2011 and Intel Threading Building Blocks.With TBB, I have a thread pool and a thread manager that allow to process chunk of data.
With the version 5.5.0, it was possible to use :

  • stepBeginSt
  • stepProcessMt
  • stepEndSt

To process the simulation in a custom multithreaded environement.
However with the latest version of Havok, the multithreaded simulation methods have been replacedwith a Thread Pool / JobQueue system. I do not want to create a second thread pool and job managersince I already have everything functionnal with TBB.
I was not able to find anything suitable in thehkWorld class to step the simulation without the havok job system.Is there a way to make Havok use a custom TBB thread manager / thread pool ?
Thanks for your help,Regards

10 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.

I went a little bit further in my attempt to use TBB as havok thread pool.
I read in the documentation that it was possible to make a custom implementation of the interface
hkJobThreadPool, then use hkWorld->stepMultithreaded( hkJobQueue, customThreadPool, stepTime );.

However, I have some issue to implement my custom version of hkJobThreadPool, mainly to understand
what exactly does all methods.

  • processAllJobs : This method should call hkJobQueue->processAllJobs() on each worker threads.
  • waitForCompletion() : Does we need to make a loop to wait for the queue to be empty ?
  • isProcessing() : This is easy storing a boolean and processAllJobs / waitForCompletion update the status of it
  • appendTimerData() : I cannot really understand this one, where are stored those timer data ? Is it internal to your original implementation of hkThreadPool ?
  • clearTimerData() : This will be more clear if I understand the appendTimerData() :)

Does anyone already implemented a custom hkJobThreadPool and know the basic algorithm of all methods ?
Any help would be appreciated.

Thanks, Kissy

Hey Kissy,

I'm not very familiar with TBB, but you are on the right track with replacing hkJobThreadPool. If you have not already I recommend looking at hkCpuJobThreadPool(Source\Common\Base\Thread\Job\ThreadPool\Cpu\hkCpuJobThreadPool.cpp).You should be able to base your implementation off of hkCpuJobThreadPool and just replace relevant parts with TBB specific implementations. Essentially you would just need to replace the usage of hkThread and hkSemaphore, and any platform-specific code.

I thinkhkCpuJobThreadPool's implementation is a good example and should be able to answer most of your questions. Let me know if it doesn't!
-Tyler

Hello Tyler,

thanks for your reply. However I do not have the hkCpuJobThreadPool.cppfile inside the source folder since I am using the Student version of Havok, but I have only thehkCpuJobThreadPool.h and it doesn't show me the implementation of the file.

Regards,
Kissy

Hi Kissy,

D'oh, I saw it marked 'public' and forgot the free release only includes binary libs.

Without a good reference I would not really recommend replacing hkCpuJobThreadPool. Replacing the Havok threading setup is by no means necessary in order to take advantage of multithreading or use Havok in a threaded environment. Additionally it will probably have little or no significant benefits. Of course I'll still try and clear up some of your questions and wish you luck :)

  • processAllJobs: This method should call hkJobQueue->processAllJobs() on each worker threads.
  • - In short, yes. The approach that CpuJobThreadPool takes is to startup N worker threads upon construction and have them wait until processAllJobs tells them to start processing. Each worker thread then calls jobQueue->processAllJobs() on a shared job queue and then waits to be re-started the next frame.

  • waitForCompletion(): Does we need to make a loop to wait for the queue to be empty ?
  • - You'll want to use a semaphore to wait on the child threads until they finish.

  • isProcessing(): This is easy storing a boolean and processAllJobs / waitForCompletion update the status of it
  • - Yep.

  • appendTimerData(): I cannot really understand this one, where are stored those timer data ? Is it internal to your original implementation ofhkThreadPool?
  • clearTimerData(): This will be more clear if I understand the appendTimerData() :)
  • - You can ignore the timer data functions if you wish. They're for debugging and profiling each thread. The intended usage is to setup and usehkMonitorStream on each thread, and mark the begin and end times that hkMonitorStream reports. See 'MonitorStatsRecorder' inhkDefaultDemo.cpp for an example of how these functions are used.

    In each worker thread you will need to also make sure you do the steps discussed in the docs under: Common Havok Components > Base Library > The Base System > Runtime Initialization > Per-Thread Initialization.

    Good luck and let me know if you have any questions or issues :)
    -Tyler

    Hey Tyler,

    I do really appreciate your description about hkCpuJobThreadPool and despite your "non-recommandation" ;)
    I decided to have a try with my custom JobThreadPool.

    I will ignored timer data functions for the moment, and see if I manage to make a JobThreadPool that is almost functionnal.

    I did read the doc about the base and memory initialisation in MT environement and I think it's good for me :)
    Thanks a lot for your good explaination.

    Kissy

    Quick update on this,

    I realized that stepMultithreaded is going to stop the current thread (Main Thread) in order to process data on it.
    I decided to switch to the second option for MT :

    • initMtStep call on the main thread
    • processAllJobs call on all the worker threads
    • Wait for the worker thread to finish jobs
    • finishMtStep call on the main thread

    It seems to work quite good.
    In order to call processAllJobs using the TBB thread pool, I am using the tbb:parallel_for method.

    Thanks for your help,
    Kissy

    Hello,

    I am still trying to get the MultiThreading working with Havok and a custom thread pool, but I got a weird error.
    It seems that the Worker thread is not able to retrieve the hkMemoryRouter instance (stored in ThreadLocal memory).
    I got a Violation Access exception at line 4 (10678349 TlsGetValue) :

    
    1067833C  je          integrateJob+23Eh (1067851Eh)
    
    10678342  mov         edx,dword ptr [hkMemoryRouter::s_memoryRouter (10BC02DCh)]
    
    10678348  push        edx
    
    10678349  call        dword ptr [__imp__TlsGetValue@4 (10BE151Ch)]
    
    

    with the given stack trace :
    
     	ntdll.dll!77be15de()
    
     	ntdll.dll!77bd014e()
    
     	PhysicSystem.dll!integrateJob(hkpMtThreadStructure & tl, hkJobQueue & jobQueue, hkJobQueue::JobQueueEntry & nextJobOut, hkBool & jobWasCancelledOut)  Ligne 760 + 0xd octets	C++
    
     	PhysicSystem.dll!hkpMultiThreadedSimulation::processNextJob(hkJobQueue & jobQueue, hkJobQueue::JobQueueEntry & job)  Ligne 131 + 0x12 octets	C++
    
     	PhysicSystem.dll!hkJobQueue::processAllJobs(bool addTimers)  Ligne 555 + 0x24 octets	C++
    
    	PhysicSystem.dll!HavokPhysicsTask::StepUpdate()  Ligne 394	C++
    
    

    This code is executed by the Worker threads and the main thread have initialized the Memory, the hkJobQueue and initMtStep before calling
    processAllJobs on the hkJobQueue.

    Here is my C++ code :

    
    //
    
    // Called from the Main System thread.
    
    //
    
    {
    
    	//
    
    	// Base Havok init
    
    	//
    
    	hkMemoryRouter* memoryRouter = hkMemoryInitUtil::initDefault( hkMallocAllocator::m_defaultMallocAllocator, hkMemorySystem::FrameInfo(512 * 1024) );
    
    	hkBaseSystem::init( memoryRouter, HavokPhysicsSystem::ErrorReport );
    	//
    
    	// Init each worker threads.
    
    	//
    
    	g_Managers.pTask->NonStandardPerThreadCallback(
    
    		reinterpret_cast< ITaskManager::JobFunction >( HavokPhysicsSystem::AllocateThreadResources ), this
    
    		);
    	//
    
    	// Create the job queue
    
    	//
    
    	hkJobQueueCinfo info;
    
    	info.m_jobQueueHwSetup.m_numCpuThreads = g_Managers.pTask->GetRecommendedJobCount();
    
    	m_jobQueue = new hkJobQueue(info);
    	//
    
    	// Create the world
    
    	//
    
    	hkpWorldCinfo worldInfo;
    
    	worldInfo.m_simulationType = hkpWorldCinfo::SIMULATION_TYPE_MULTITHREADED;
    
    	worldInfo.m_broadPhaseBorderBehaviour = hkpWorldCinfo::BROADPHASE_BORDER_REMOVE_ENTITY;
    
    	m_pWorld = new hkpWorld( worldInfo );
    	//
    
    	// Start editing the world.
    
    	//
    
    	m_pWorld->markForWrite();
    	hkpAgentRegisterUtil::registerAllAgents( m_pWorld->getCollisionDispatcher() );
    
    	m_pWorld->registerWithJobQueue( m_jobQueue );
    	//
    
    	//  Create the ground box
    
    	//
    
    	{
    
    		hkVector4 groundRadii( 70.0f, 2.0f, 140.0f );
    
    		hkpConvexShape* shape = new hkpBoxShape( groundRadii , 0 );
    		hkpRigidBodyCinfo ci;
    		ci.m_shape = shape;
    
    		ci.m_motionType = hkpMotion::MOTION_FIXED;
    
    		ci.m_position = hkVector4( 0.0f, -2.0f, 0.0f );
    
    		ci.m_qualityType = HK_COLLIDABLE_QUALITY_FIXED;
    		m_pWorld->addEntity( new hkpRigidBody( ci ) )->removeReference();
    
    		shape->removeReference();
    
    	}
    	//
    
    	// Now we have finished modifying the world, release our write marker.
    
    	//
    
    	m_pWorld->unmarkForWrite();
    
    }
    
    

    Here is the part that initialize memory in the Worker thread (AllocateThreadResources)
    
    void
    
    HavokPhysicsSystem::AllocateThreadResources(
    
    	HavokPhysicsSystem* pSystem
    
    	)
    
    {
    
    	//
    
    	// Do not initialize main thread.
    
    	// Already done in mainInit
    
    	//
    
    	u32 currentThreadId = ::GetCurrentThreadId();
    
    	if ( currentThreadId == s_idMainThread )
    
    	{
    
    		pSystem->s_threadNumberCount.fetch_and_increment();
    
    		return;
    
    	}
    	HK_THREAD_LOCAL_SET( hkThreadNumber, pSystem->s_threadNumberCount.fetch_and_increment() );
    	//
    
    	// Create thread memory for the thread.
    
    	//
    
    	hkMemoryRouter memoryRouter;
    
    	hkMemorySystem::getInstance().threadInit( memoryRouter, "PhysicSystemWorker" );
    
    	hkResult result = hkBaseSystem::initThread( &memoryRouter );
    
    	ASSERT ( result == HK_SUCCESS );
    	WorkerMemoryRouterMap_t::accessor a;
    
    	s_workerMemoryRouterMap.insert( a, ::GetCurrentThreadId() );
    
    	a->second = &memoryRouter;
    	ASSERT( hkMemoryRouter::getInstancePtr() != NULL );
    
    }
    
    

    Here is the part that is executed by the main thread on each frame :
    
    {
    
    	hkpStepResult result = m_pWorld->initMtStep( m_jobQueue , hkReal( DeltaTime ) );
    
    	ASSERT(result == hkpStepResult::HK_STEP_RESULT_SUCCESS);
    	//
    
    	// Issue jobs for multi-threaded stepping.
    
    	//
    
    	g_Managers.pTask->ParallelFor(
    
    		this,
    
    		reinterpret_cast(StepUpdateS),
    
    		this,
    
    		0,
    
    		m_cJobs,
    
    		PhysicSystemTaskGrainSize
    
    	);
    	//
    
    	// End the world stepping.
    
    	//
    
    	hkpStepResult result = m_pWorld->finishMtStep();
    
    	ASSERT(result == hkpStepResult::HK_STEP_RESULT_SUCCESS);
    
    }
    
    

    Then, on each worker thread the ParallelForFunction call this method :
    
    {
    
    	//
    
    	// Step the world.
    
    	//
    
    	m_jobQueue->processAllJobs();
    
    }
    
    

    Everything works fine if I replace the initMtStep / processAllJobs / finishMtStep with a stepDeltaTime.
    Does anyone know what exactly could cause the issue ?

    Regards,
    Kissy

    Best Reply

    Hey Kissy,

    I have to admit I looked over this a few times before it popped out at me.

    {
    
        // ...
        // Create thread memory for the thread.
    
        //
    
        hkMemoryRouter memoryRouter;
    
        hkMemorySystem::getInstance().threadInit( memoryRouter, "PhysicSystemWorker" );
    
        hkResult result = hkBaseSystem::initThread( &memoryRouter );
        // ...
    } // memoryRouter.~hkMemoryRouter() - your memory router is getting destructed here.
    
    The memory router goes out of scope and you're left with a dangling pointer when the job runs later.
    There's a few ways to fix this; here's a couple straight forward options:

    1)Construct the hkMemoryRouter on the heap. Unfortunately your cannot new an hkMemoryRouter (since new is overloaded to use the memory router which obviously doesn't exist yet). So you will need to directly malloc and use placement new. Make sure you free the mem router later after you shutdown the thread. This will look like this:

    hkMemoryRouter* memoryRouter = new ( malloc( sizeof(hkMemoryRouter) ) ) hkMemoryRouter();
    
    hkMemorySystem::getInstance().threadInit( *memoryRouter, "PhysicSystemWorker" );
    
    hkBaseSystem::initThread( memoryRouter );

    2) Construct (and destruct) the hkMemoryRouter at the scope where you callm_jobQueue->processAllJobs. This would just mean moving the hkMemoryRouter setup and shutdown code to the same function that currently calls processAllJobs().

    There's a few other ways you could go about solving this but I think these fit your case best. I'd recommend going with #1 unless you have a reason not to call malloc directly. #2 will incur cpu overhead each frame as it sets up and shuts down the memory router for that thread.

    Let me know if this fixes the issue you're seeing,
    - Tyler

    PS: to answer your last question: stepDeltaTime() is single threaded so you would not hit any custom threading issues.

    Hey Tyler,

    I can only bow to you for the hint.
    It was actually the scope that was making memoryRouter destructed.
    Everything is working perfect now and with great performances using MT ;)

    Thanks again !
    Kissy

    Leave a Comment

    Please sign in to add a comment. Not a member? Join today