A stab at a TBB actor implementation

A stab at a TBB actor implementation

After watching one of the GDC/TBB seminars, and afterwards picking the presenter's head, we came up with a simple Actor (Erlang-style) based on TBB.

And the current state is more or less like this:

template
class Actor
{
	typedef Actor	Actor_t;

	class MessageHandlerTask : public tbb::task
	{
		Actor_t& pActor;
	public:
		//MessageHandlerTask methods
		MessageHandlerTask(Actor_t& actor)
			: pActor(actor)
		{}

		tbb::task * execute()
		{
			pActor.processAllMessages();
			return NULL;
		}
	};

public:
	void inbox(const MSG_TYPE& message,bool cleanup=false)
	{
		messageQueue.push(std::make_pair(message,cleanup));
		if(isProcessingNow.compare_and_swap(false, true) == true)
		{
			// allocate and spawn the MessageHandlerTask
			pMessageHandlerTask = new(pRootTask->allocate_additional_child_of(*pRootTask)) MessageHandlerTask(*this);
			pRootTask->spawn(*pMessageHandlerTask);
		}
	}

	Actor(Actor* parent=NULL)
		:pParent(parent)
	{
		if(pParent)
		{
			//borrow parent's root
			pRootTask = pParent->pRootTask;
		}
		else
		{
			//I'm a root so allocate a new root
			pRootTask = new(tbb::task::allocate_root()) tbb::empty_task();
			pRootTask->set_ref_count(2);
		}
	}

	virtual ~Actor()
	{
		if(!pParent) //I'm a root
		{
			if(pRootTask != NULL)
			{
				tbb::spin_mutex::scoped_lock(spinMutex);
				if(pRootTask != NULL)
				{
					pRootTask->wait_for_all();
					pRootTask->destroy(*pRootTask);
					pRootTask = NULL;
				}
			}
		}
	}

//protected:
	//in reality this method should only be called by the actor itself to create children
	Actor* CreateChildActor()
	{
		return new Actor(this);
	}

protected:
	virtual void messageHandler(MSG_TYPE& message)
	{
		//User's Actor implementation
	}

	virtual void cleanup(MSG_TYPE& message)
	{
		//User's Actor implementation
	}

private:
	void processAllMessages()
	{
		std::pair	pData;
		while(messageQueue.pop_if_present(pData))
		{
			messageHandler(pData.first);
			if(pData.second==true)
			{
				cleanup(pData.first);
			}
		}
		isProcessingNow = false;
	}

	Actor_t*				pParent;
	tbb::task*				pRootTask;
	MessageHandlerTask*			pMessageHandlerTask;
	tbb::atomic			isProcessingNow;
	tbb::spin_mutex				spinMutex;
	tbb::concurrent_queue messageQueue;
};


Any suggestions related to the implementation?

12 帖子 / 0 全新
最新文章
如需更全面地了解编译器优化,请参阅优化注意事项

Hmm, isn't TBB all about creating new tasks, not new threads (see pre-existing thread "Support for actor programming model")?

Quoting - Raf Schietekat
Hmm, isn't TBB all about creating new tasks, not new threads (see "Support for actor programming model")?

Hmm, that's why this actor uses tasks to execute it's message handling, leaving threading to the scheduler, in a fairly TBB-style, or that's what it looks like to me. There is no thread creation in the above code

Quoting - robert.jay.gould
Hmm, that's why this actor uses tasks to execute it's message handling, leaving threading to the scheduler, in a fairly TBB-style, or that's what it looks like to me. There is no thread creation in the above code

Sorry, that was just a silly joke, and you saw the version from before I slightly clarified what I meant.

Here's a real question: are there any blocking calls, why (not), and (if so) wouldn't this cause problems like starvation and/or stack explosion? That's just what I remember from before, I haven't looked at the code yet (got to run now).

That may deadlock. Check handling of the isProcessingNow variable in processAllMessages() and in inbox(). Think of your queueing mechanism as Peterson's mutual exclusion algorithm with variables being isProcessingNow and messageQueue being empty or not.

Quoting - Raf Schietekat

Sorry, that was just a silly joke, and you saw the version from before I slightly clarified what I meant.

Here's a real question: are there any blocking calls, why (not), and (if so) wouldn't this cause problems like starvation and/or stack explosion? That's just what I remember from before, I haven't looked at the code yet (got to run now).

Well the purpose of this Actor is for simple processing and AI-like uses, where IO shouldn't occur, because, well that's not what this Actor was made for :)

Anyways I was thinking about providing an async message system so an Actor can wrap an IO action and get a message back when finished (to allow for some IO when utterlynecessary), but this design ignores IO in general

Quoting - Dmitriy Vyukov
That may deadlock. Check handling of the isProcessingNow variable in processAllMessages() and in inbox(). Think of your queueing mechanism as Peterson's mutual exclusion algorithm with variables being isProcessingNow and messageQueue being empty or not.

Thanks for the advice
I got rid of the deadlock and some other issues with that approach

Quoting - robert.jay.gould

Any suggestions related to the implementation?

I agree with Dmitry that it can deadlock.

Another note is that since there is a single consumer of the queue in any given time, a queue implementation that is customized for such case might work better than the generic tbb::concurrent_queue. Only measurements will tell for sure, of course.

The third note is that the usage model should assume there is a lot of actors to be efficient and scalable. I am sure you targeted exactly that (as opposed to a few actors with a lot of processing for messages).

Robert and all,

This is very cool to see my know-nothing-about-Actors sample turned into something real. I would be very interested in hearing from any of you how you are using a TBB-backed Actor implementation.

Also, I'm not seeing the possibility for deadlock on isProcessingNow... sounds like you've cleaned that up in an edit already.

Quoting - Brad Werth (Intel)
Robert and all,

This is very cool to see my know-nothing-about-Actors sample turned into something real. I would be very interested in hearing from any of you how you are using a TBB-backed Actor implementation.

Also, I'm not seeing the possibility for deadlock on isProcessingNow... sounds like you've cleaned that up in an edit already.

Hi Brad,

the deadlock is really tricky, but more so was the dealing with the tbb's arena implementation's internals. Mostly because tbb doesn't like one thread touching tasks that belong to other threads.

anyways I improved the Actor and will post it's latest iteration now, comments more than welcomed

Ok I fixed several issues and sprinkled in some task continuation. Now the actor seems to work fine, and doesn't deadlock (as far as I've been able to see). So here it is:

//prototype
template
class Actor
{
public:
	typedef Actor	Actor_t;
	typedef typename MSG_TYPE Msg_t;
private:
	class MessageHandlerTask : public tbb::task
	{
		Actor_t& pActor;
	public:
		//MessageHandlerTask methods
		MessageHandlerTask(Actor_t& actor)
			: pActor(actor)
		{}

		tbb::task * execute()
		{
			pActor._processAllMessages();
			return NULL;
		}
	};

public:
	Actor(long id, Actor_t* parent=NULL)
		: pParent(parent)
		, myId(id)
	{
		isProcessingNow = false;
		childrenCounter = 0;
		if(pParent)
		{
			//borrow parent's root
			pRootTask = pParent->pRootTask;
		}
		else
		{
			//I'm the root actor so allocate a root
			pRootTask = new(tbb::task::allocate_root()) tbb::empty_task();
			pRootTask->set_ref_count(++childrenCounter);
		}
	}

	virtual ~Actor()
	{
		waitUntilDone();
		if(!messageQueue.empty())
		{
			printf("Actor %i was destroyed with %i messages in its queue leftn",myId,messageQueue.size());
		}
	}

	template
	Actor* createChild(long id)
	{
		static_cast(this);//Static assert that the Child's class is derived from same Root class
		pRootTask->set_ref_count(++childrenCounter);
		//TODO recycling optimization
		return new CHILDCLASS(id,this);
	}

	const long getId()
	{
		return myId;
	}

	//Mailbox for external interface
	void inbox(const MSG_TYPE& message, bool cleanup=false)
	{
		_inbox(this,message,cleanup);
	}

	//TODO Not sure if this should really be a public interface or not, as it Root only
	void waitUntilDone()
	{
		if(!pParent)
		{
			if(pRootTask != NULL)
			{
				tbb::spin_mutex::scoped_lock(spinMutex);
				if(pRootTask != NULL)
				{
					while(childrenCounter!=1)
					{
						int c = pRootTask->ref_count();
						pRootTask->wait_for_all();
					}
					printf("all actions finishedn");
					pRootTask->destroy(*pRootTask);
					pRootTask = NULL;
				}
			}
		}
	}

protected:
	//Behavior overrides
	virtual bool messageHandler(MSG_TYPE& message)
	{
		//User's Actor implementation
		return false;
	}

	virtual void cleanup(MSG_TYPE& message)
	{
		//User's Actor implementation
	}

	//Access for an actor to mail another actor
	void sendMessage(Actor_t& target,const MSG_TYPE& message, bool cleanup=false)
	{
		target._inbox(this,message,cleanup);
	}

private:
	void _inbox(Actor_t* sender, const MSG_TYPE& message, bool cleanup=false)
	{
		messageQueue.push(std::make_pair(message,cleanup));
		if(sender==this)
		{
			_processMailBox(pRootTask);
		}
		else
		{
			_processMailBox(sender->pMessageHandlerTask);
		}
	}

	void _processAllMessages()
	{
		std::pair	pData;
		bool ok = true;
		do
		{
			isProcessingNow = true;
			if(messageQueue.pop_if_present(pData))
			{
				ok = messageHandler(pData.first);
				if(pData.second==true)
				{
					cleanup(pData.first);
				}
			}
		}while(ok && !messageQueue.empty());
		isProcessingNow = false;
		if(!ok)
		{
			pParent->_destroyChild(const_cast(this));
		}
	}

	void _processMailBox(tbb::task* root)
	{
		if((messageQueue.empty()==false) && !(isProcessingNow.compare_and_swap(true, false)))
		{
			// allocate and spawn the MessageHandlerTask
			tbb::empty_task& continuation = *new( root->allocate_continuation() ) tbb::empty_task;
			pMessageHandlerTask = new(continuation.allocate_child()) MessageHandlerTask(*this);
			continuation.set_ref_count(1);
			root->spawn(*pMessageHandlerTask);
		}
	}

	void _destroyChild(Actor* child)
	{
		pRootTask->set_ref_count(--childrenCounter);
		//TODO recycling optimization
		delete child;
	}

	const long							myId;//Actor Id
	Actor*								pParent;
	tbb::task*							pRootTask;
	MessageHandlerTask*					pMessageHandlerTask;
	tbb::concurrent_queue	messageQueue;
	tbb::atomic					isProcessingNow;
	
	//These could be refactored out to a RootActorClass
	tbb::atomic					childrenCounter;
	tbb::spin_mutex						spinMutex;
};

Now I made a test scenario, which consists of each actor receiving a number, and passing the number-1 to the actor to it's "right". I'm calling this a spiral countdown. Anyways the objective is to have actors collaborating with heavy dependence on others.

typedef Actor	TestActor;

//Note: Because Actors are doing a spiral countdown, the std::vector's alignment seems to be better than concurrent_vector
typedef std::vector Children;

static const size_t ActorsN = 1000;
static Children children (ActorsN);

Children& GetChildren()
{
	return children;
}

class ActorTypeA: public TestActor
{
public:
	ActorTypeA(long id,TestActor* root=NULL)
		: TestActor(id,root)
	{}
protected:
	virtual bool messageHandler(TestActor::Msg_t& message)
	{
		if(message!=0)
		{
			size_t nextId = (getId()+1)%ActorsN;
			TestActor* next = GetChildren()[nextId];
			sendMessage(*next,message-1);
			return true;
		}
		else
		{
			printf("Actor %i finished!n",getId());
			return false;//kill me
		}
	}
};


int main(int argc, char* argv[])
{
	tbb::task_scheduler_init init;//(1);
	{
		tbb::tick_count t0;
		Children &children = GetChildren();
		{
			TestActor	root(-1);
			{
				Children::iterator iter = children.begin();
				Children::iterator end = children.end();
				size_t counter = 0;
				for(;iter!=end;++iter,++counter)
				{
					*iter = root.createChild(counter);
				}
				for(iter = children.begin();iter!=end;++iter,++counter)
				{
					//worst case (?) style spiral countdown
					(*iter)->inbox(ActorsN);
				}
			}
			t0 = tbb::tick_count::now();
			root.waitUntilDone();
			tbb::tick_count t1 = tbb::tick_count::now();
			printf ("time = %.1f msecn", (t1-t0).seconds()*1000);
		}
	}
	return 0;
}

Some things I noticed were that interesting were that when the Actors' N was between 10 to 100, it scales linearly, but when N > 1000 it actually scales supralinearly, which I guess is due to the TBB scheduler working it's magic.

Any ideas?

Also as per Alexey's suggestion I implemented another version of the actor with a custom queue (a C++ version of Dmitriy's agent queue here with a tbb::concurrent_allocator). That boost's performance by about 25% when using an N>1000.

发表评论

登录添加评论。还不是成员?立即加入