最快线程间数据交换算法,有效避免锁竞争 -- TwoQueues

处理多线程数据共享问题注意的几个要点:

1、锁竞争:尽量减少锁竞争的时间和次数。

2、内存:尽量是使用已分配内存,减少内存分配和释放的次数。尽量是用连续内存,减少共享占用的内存量。


多线程数据交换简单方案A:

定义一个list,再所有操作list的地方进行加锁和解锁。

简单模拟代码:

class CSimpleQueue
{
public:
	CSimpleQueue()
	{
		InitializeCriticalSection(&m_crit);
	}
	~CSimpleQueue()
	{
		DeleteCriticalSection(&m_crit);
	}

	// 添加数据
	void AddData(DATA pData)
	{
		EnterCriticalSection(&m_crit);
		m_listDATA.push_back(pData);
		LeaveCriticalSection(&m_crit);
	}
	// 获取数据
	bool GetData(DATA data)
	{
		EnterCriticalSection(&m_crit);
		if (m_listDATA.size() > 0)
		{
			data = m_listDATA.front();
			m_listDATA.pop_front();
			LeaveCriticalSection(&m_crit);

			return true;
		}
		else
		{
			LeaveCriticalSection(&m_crit);
			return false;
		}
	}

private:
	list<DATA> m_listDATA;
	CRITICAL_SECTION m_crit;
};

多线程数据交换优化方案B -- TwoQueues:

文章最后有TwoQueues的详细实现源代码。

TwoQueues的实现分析:

锁竞争:TwoQueues使用双内存块交换的方式减少锁竞争,数据写入时会进行加锁和解锁操作,但读取数据时,只是当当前读取队列的数据读取完毕时,进行两个队列交换时才进行加锁和解锁操作。可以说,数据的读取是不进行加锁的。这样,最大限度的降低了锁竞争问题。

内存:TwoQueues采用预先分配内存块的方式,初始化TwoQueues时就已经将存放数据的内存分配好了。之后数据存放于已经分配的内存块上。无需再次分配内存,不会再次进行内存分配。

TwoQueues的进一步优化,TwoQueues在存放数据时,完全可以模仿【数据长度+数据】的方式存放数据。但是这种方式会增加数据存在性检测的效率。TwoQueues则采用了一种链表的方式进行存放数据。

链表结构:

typedef struct tagDataHead
	{
		tagDataHead()
		{
			pHead = NULL;
			nLen = 0;
			pNext = NULL;
		}
		void ReSet()
		{
			pHead = NULL;
			nLen = 0;
			pNext = NULL;
		}
		char* pHead;
		unsigned int nLen;
		tagDataHead* pNext;
	}DATAHEAD;

此链表存放了数据头指针和数据长度。当TwoQueues中存在数据时,可以直接通过链表节点拿数据。因为链表每次创建时也是需要进行申请内存,而内存申请是一个比较耗效率的事情,TwoQueues再此做了一个小小的处理,当链表头不存在时,会进行ReAlloc调用,一性申请MALLOC_SIZE个链表头结构。并且链表从链表上解下时并不会释放内存而是放入一个pFreeDataHead链表上,当需要申请链表头结构时,会先从pFreeDataHead链表上取链表结构。此处理减少了内存分配和释放的次数,提高了多线程数据共享的效率。

以上的各种优化处理,使TwoQueues的效率得到了极大的提升。

之前做过的内存共享的测试。A线程不停的写入数据,B线程不停的读取数据。

debug版本release版本
TwoQueues 80-100万次/秒 170-180万次/秒
CSimpleQueues 20万次/秒 5万次/秒

双队列的使用:

创建对象:

CTwoQueues m_cTwoQueues;

// 初始化双队列大小,因为是双队列,所以内存占用量是两份。

m_cTwoQueues.Init(0x0FFFFFFF);

写入方式:

m_cTwoQueues.PushData(sz, strlen(sz)+1);

读取方式:

const void* pData = NULL;
unsigned int nLen = 0;

if (m_cTwoQueues.PrepareData(pData, nLen))
{
// 处理数据 ...

// 确认(丢弃)此数据
m_cTwoQueues.ConfimData();
}

// 下面是TwoQueues的所有源代码  源代码下载

#pragma once
#include <assert.h>

namespace clwCore
{
#define MALLOC_SIZE	128

	typedef struct tagDataHead
	{
		tagDataHead()
		{
			pHead = NULL;
			nLen = 0;
			pNext = NULL;
		}
		void ReSet()
		{
			pHead = NULL;
			nLen = 0;
			pNext = NULL;
		}
		char* pHead;
		unsigned int nLen;
		tagDataHead* pNext;
	}DATAHEAD;

	typedef struct tagDataMem
	{
		tagDataMem(unsigned int nSize)
		{
			if (0 == nSize)
			{
				assert(false);
				return ;
			}

			nMaxSize = nSize;
			pDataMem = (char*)malloc(nSize*sizeof(char));
			nDataPos = 0;

			if (NULL == pDataMem)
			{
				// CTwoQueues申请malloc内存失败。
				assert(false);
			}

			pListDataHead = NULL;
			pCurDataHead = NULL;
			pFreeDataHead = NULL;
		}
		~tagDataMem()
		{
			// 释放节点
			ReSet();
			while(NULL != pFreeDataHead)
			{
				DATAHEAD* pTempDataHead = pFreeDataHead;
				pFreeDataHead = pFreeDataHead->pNext;

				delete pTempDataHead;
				pTempDataHead = NULL;
			}

			free(pDataMem);
			pDataMem = NULL;
			nDataPos = 0;
		}
		bool ReAlloc()
		{
			for (unsigned short i=0; i<MALLOC_SIZE; ++i)
			{
				DATAHEAD* pTempDataHead = new DATAHEAD;
				//pTempDataHead->ReSet();	//构造时已经初始化

				if (NULL == pTempDataHead)
				{
					// 申请DATAHEAD内存失败。
					assert(false);
					return false;
				}

				pTempDataHead->pNext = pFreeDataHead;
				pFreeDataHead = pTempDataHead;
			}

			return true;
		}
		DATAHEAD* GetDataHead()
		{
			if (NULL != pFreeDataHead)
			{
				DATAHEAD* pTempDataHead = pFreeDataHead;
				pFreeDataHead = pFreeDataHead->pNext;

				return pTempDataHead;
			}

			if (ReAlloc())
			{
				if (NULL != pFreeDataHead)
				{
					DATAHEAD* pTempDataHead = pFreeDataHead;
					pFreeDataHead = pFreeDataHead->pNext;

					return pTempDataHead;
				}
			}

			// ASSERT("GetDataHead返回NULL。");
			assert(false);
			return NULL;
		}
		unsigned int GetFreeLen()	//空闲内存长度
		{
			return nMaxSize-nDataPos;
		}
		bool PushData(void* pData, unsigned int nLen)
		{
			if (nDataPos+nLen >= nMaxSize)
			{
				return false;
			}

			DATAHEAD* pTempDataHead = GetDataHead();

			if (NULL == pTempDataHead)
			{
				return false;
			}

			// 构造数据头结构
			pTempDataHead->pHead = (pDataMem+nDataPos);
			pTempDataHead->nLen = nLen;
			pTempDataHead->pNext = NULL;

			// 拷贝数据
			memcpy(pDataMem+nDataPos, pData, nLen);
			nDataPos += nLen;

			if (NULL == pListDataHead)
			{
				pListDataHead = pTempDataHead;
				pCurDataHead = pTempDataHead;
				return true;
			}
			else
			{
				pCurDataHead->pNext = pTempDataHead;
				pCurDataHead = pCurDataHead->pNext;
				return true;
			}
		}

		bool IsEmpty()	//判断是否有数据
		{
			return (NULL==pListDataHead)?true:false;
		}

		bool PrepareData(const void*& pData, unsigned int& nLen)	//准备一条数据
		{
			if (NULL != pListDataHead)
			{
				pData = pListDataHead->pHead;
				nLen = pListDataHead->nLen;
				return true;
			}
			else
			{
				return false;
			}
		}
		void ConfimData()	//删除一条数据
		{
			if (NULL == pListDataHead)
			{
				return ;
			}

			DATAHEAD* pTempDataHead = pListDataHead;
			pListDataHead = pListDataHead->pNext;

			pTempDataHead->ReSet();
			pTempDataHead->pNext = pFreeDataHead;
			pFreeDataHead = pTempDataHead;
		}
		void ReSet()	//重置内存存储对象
		{
			while(NULL != pListDataHead)
			{
				DATAHEAD* pTempDataHead = pListDataHead;
				pListDataHead = pListDataHead->pNext;

				pTempDataHead->ReSet();
				pTempDataHead->pNext = pFreeDataHead;
				pFreeDataHead = pTempDataHead;
			}

			nDataPos = 0;
			pCurDataHead = NULL;
		}

		char* pDataMem;			//数据内存区域
		unsigned int nDataPos;	//数据存储位置
		unsigned int nMaxSize;	//最大存储区域大小

		DATAHEAD* pListDataHead;
		DATAHEAD* pCurDataHead;
		DATAHEAD* pFreeDataHead;	//空闲头结构队列
	}DATAMEM;

	class CTwoQueues
	{
	public:
		CTwoQueues(void)
		{
			InitializeCriticalSection(&m_crit);
			m_pDataMemPush = NULL;
			m_pDataMemPop = NULL;
		}
		~CTwoQueues(void)
		{
			if (NULL != m_pDataMemPush)
			{
				delete m_pDataMemPush;
				m_pDataMemPush = NULL;
			}

			if (NULL != m_pDataMemPop)
			{
				delete m_pDataMemPop;
				m_pDataMemPop = NULL;
			}
			DeleteCriticalSection(&m_crit);
		}

	public:
		void Init(unsigned int nSize)
		{
			if (0 == nSize)
			{
				// 初始化CTwoQueues对象失败。
				assert(false);
				return ;
			}

			m_pDataMemPush = new DATAMEM(nSize);
			m_pDataMemPop = new DATAMEM(nSize);
		}

		bool PushData(void* pData, unsigned int nLen)
		{
			//unsigned int nFreeLen = m_pDataMemPush->GetFreeLen();

			bool bResult = false;

			EnterCriticalSection(&m_crit);
			bResult = m_pDataMemPush->PushData(pData, nLen);
			LeaveCriticalSection(&m_crit);

			return bResult;
		}
		bool PrepareData(const void*& pData, unsigned int& nLen)
		{
			bool bCanRead = true;
			if (m_pDataMemPop->IsEmpty())
			{
				// 队列没有数据了
				EnterCriticalSection(&m_crit);
				if (m_pDataMemPush->IsEmpty())
				{
					// Push队列为空
					LeaveCriticalSection(&m_crit);
					bCanRead = false;
				}
				else
				{
					m_pDataMemPop->ReSet();	//充值读取队列
					DATAMEM* pTempDataMem = m_pDataMemPush;
					m_pDataMemPush = m_pDataMemPop;
					m_pDataMemPop = pTempDataMem;
					LeaveCriticalSection(&m_crit);
					bCanRead = true;
				}
			}

			if (bCanRead)
			{
				return m_pDataMemPop->PrepareData(pData, nLen);
			}
			else
			{
				return false;
			}
		}

		void ConfimData()
		{
			m_pDataMemPop->ConfimData();
		}

	private:
		DATAMEM* m_pDataMemPush;
		DATAMEM* m_pDataMemPop;
		CRITICAL_SECTION m_crit;

	};
}



如需更全面地了解编译器优化,请参阅优化注意事项

评论

云剑 夏.的头像

小技巧成就大成功啊 又让我学习了一下东西 谢谢楼主啊 啊

http://www.cdfap.com/news/html/?374.html

www.cdfap.com_www.zhenaizhaoren.com_www.tbmll.com
科磊 李.的头像

这篇文章还应该算是启发级别的吧,下面我用生产者-消费者模型说一下我的看法:
首先,只用两个内存块来交换数据,对于生产者生产过快没有做太多的意外处理:如果生产者把push的块用完了,但是消费者还在使用pop的块,那么生产者就会阻塞,解决的方法就是组成一个push块的链表,或者进行速度控制。或许这个问题可以避免,通过mem块的sz来解决。
第二,只有push时加锁,而pop时不加锁,这个条件可以使我们认为push块有多个使用者(一个消费者,多个生产者),而pop块只有一个使用者(即一个消费者),所以TwoQueues只能用于多个生产者,一个消费者的情形。
但是代码无疑是精彩的,希望作者能写出更完美的多线程数据交换算法。

han z.的头像

这几天,突然和公司一做3D引擎的哥们聊天,谈线程间数据交换速度的问题。如果能减少cpu高速缓存的数据多次寻址和调入内存数据,就能达到更高的效果。我考虑了下,对于TwoQueues的链表是可以不存在的,之后有时间进一步优化后,再和大家分享。

Soli的头像

如 科磊 李 所说,只适用于“一个消费者,多个生产者”的情况。
这种情况根本不需要双队列,在一个写锁的基础上,再加一个原子变量即可。

寂寞是国,我是王。