Unbounded single-producer/single-consumer node-based queue. Internal non-reducible cache of nodes is used. Dequeue operation is always wait-free. Enqueue operation is wait-free in common case (when there is available node in the cache), otherwise enqueue operation calls ::operator new(), so probably not wait-free. No atomic RMW operations nor heavy memory fences are used, i.e. enqueue and dequeue operations issue just several plain loads, several plain stores and one conditional branching. Cache-conscious data layout is used, so producer and consumer can work simultaneously causing no cache-coherence traffic.
Single-producer/single-consumer queue can be used for communication with thread which services hardware device (wait-free property is required), or when there are naturally only one producer and one consumer. Also N single-producer/single-consumer queues can be used to construct multi-producer/single-consumer queue, or N^2 queues can be used to construct fully-connected system of N threads (other partially-connected topologies are also possible).
Hardware platform: x86-32/64
Compiler: Intel C++ Compiler
// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
// hardware fence is implicit on x86
T v = *const_cast<T const volatile*>(addr);
__memory_barrier(); // compiler fence
return v;
}
// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
// hardware fence is implicit on x86
__memory_barrier(); // compiler fence
*const_cast<T volatile*>(addr) = v;
}
// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;
// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
spsc_queue()
{
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_= tail_copy_ = n;
}
~spsc_queue()
{
node* n = first_;
do
{
node* next = n->next_;
delete n;
n = next;
}
while (n);
}
void enqueue(T v)
{
node* n = alloc_node();
n->next_ = 0;
n->value_ = v;
store_release(&head_->next_, n);
head_ = n;
}
// returns 'false' if queue is empty
bool dequeue(T& v)
{
if (load_consume(&tail_->next_))
{
v = tail_->next_->value_;
store_release(&tail_, tail_->next_);
return true;
}
else
{
return false;
}
}
private:
// internal node structure
struct node
{
node* next_;
T value_;
};
// consumer part
// accessed mainly by consumer, infrequently be producer
node* tail_; // tail of the queue
// delimiter between consumer part and producer part,
// so that they situated on different cache lines
char cache_line_pad_ [cache_line_size];
// producer part
// accessed only by producer
node* head_; // head of the queue
node* first_; // last unused node (tail of node cache)
node* tail_copy_; // helper (points somewhere between first_ and tail_)
node* alloc_node()
{
// first tries to allocate node from internal node cache,
// if attempt fails, allocates node via ::operator new()
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
tail_copy_ = load_consume(&tail_);
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
node* n = new node;
return n;
}
spsc_queue(spsc_queue const&);
spsc_queue& operator = (spsc_queue const&);
};
// usage example
int main()
{
spsc_queue<int> q;
q.enqueue(1);
q.enqueue(2);
int v;
bool b = q.dequeue(v);
b = q.dequeue(v);
q.enqueue(3);
q.enqueue(4);
b = q.dequeue(v);
b = q.dequeue(v);
b = q.dequeue(v);
}

Comments
I just went over my last code posting again. I decided this piece of code needs some major work. I will be posting something better latter tonight I hope. http://codepad.org/2Sm5zT2u
I just rewrote the entire piece of code. I changed the names of the functions enqueue and dequeue, to match the names of std::queue. I made all the members of spsc_queue private. I added two helper classes, spsc_queue_consumer, and spsc_queue_producer, so that the consumer object is now a separate object, of a different class, than the producer object.
I added a file with memory fences for Microsoft, gcc and Intel.
I added a class spsc_queue_tester, which has regression test, and one stress test. I added a regression test program also.
http://acumensoftwareinc.com/TechNotes/spsc_queue/index.html
Would anyone care to do a code review, or check my memory fences?
SFENCE/LFENCE does not do what you think they do. Check out Intel IA-32/Intel64 Memory Model doc. In most cases you do not need them.
At this point I would strongly encourage you to stick with C++11/C1x atomics semantics and interface. Some compilers already support them. If not, do you own impl; you will be able to remove it once you compiler supports C++11/C1x atomics.
If you use C++11 compiler, but do not use C++11 atomics, then the compiler can easily broke you code.
You need it to be an atomic store:
00102 // if tail == head, then this is the signal to pop_front that it has data
00103 tail_->next_ = n;
00105 // commit memory write because this could reduce latency.
00106 // This has no effect on program logic though.
00107 memory_fence::sfence();
Memory fences does not work than way. They do not reduce latency, they increase latency. If there would be a way to reduce latency, it would be turned on by default for all memory operations.
I don't have a C++ 11.0 compiler.
So how should I write the memory fences?
Should I call the functions for each compiler?
If I remove line 107, memory_fence::sfence();
Then the CPU might not execute line 106, for a very long time: tail_->next_ = n;
If the program runs forever, and no other lines depend on line 106 getting executed, line 106 might not ever get executed at all.
If line 106 isn't executed, then the consumer won't ever see the new data.
So by executing the memory fence on line 107, doesn't that insure the consumer will get the data as soon as possible?
This is how I wrote the pop function. How would you write it, so that it could run under GCC, Intel, or Microsoft C++, and be forward compatible to c++11.0, and be as fast as possible ?
00093 bool push_back(const T& v)
00094 {
00095 volatile node* n = alloc_node( v);
00096 if (!n) return false;
00098 //makes sure node is fully formed before it is assigned to tail_->next_
00099 memory_fence::sfence();
00102 // if tail == head, then this is the signal to pop_front that it has data
00103 tail_->next_ = n;
00105 // commit memory write because this could reduce latency.
00106 // This has no effect on program logic though.
00107 memory_fence::sfence();
00109 // advance tail
00110 tail_ = (node*)n;
00112 return true;
00113 }
Pages