Single-Producer/Single-Consumer Queue

Unbounded single-producer/single-consumer node-based queue. Internal non-reducible cache of nodes is used. Dequeue operation is always wait-free. Enqueue operation is wait-free in common case (when there is available node in the cache), otherwise enqueue operation calls ::operator new(), so probably not wait-free. No atomic RMW operations nor heavy memory fences are used, i.e. enqueue and dequeue operations issue just several plain loads, several plain stores and one conditional branching. Cache-conscious data layout is used, so producer and consumer can work simultaneously causing no cache-coherence traffic.

Single-producer/single-consumer queue can be used for communication with thread which services hardware device (wait-free property is required), or when there are naturally only one producer and one consumer. Also N single-producer/single-consumer queues can be used to construct multi-producer/single-consumer queue, or N^2 queues can be used to construct fully-connected system of N threads (other partially-connected topologies are also possible).

Hardware platform: x86-32/64

Compiler: Intel C++ Compiler

 

 

// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
  // hardware fence is implicit on x86
  T v = *const_cast<T const volatile*>(addr);
  __memory_barrier(); // compiler fence
  return v;
}

// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
  // hardware fence is implicit on x86
  __memory_barrier(); // compiler fence
  *const_cast<T volatile*>(addr) = v;
}

// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;

// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
  spsc_queue()
  {
      node* n = new node;
      n->next_ = 0;
      tail_ = head_ = first_= tail_copy_ = n;
  }

  ~spsc_queue()
  {
      node* n = first_;
      do
      {
          node* next = n->next_;
          delete n;
          n = next;
      }
      while (n);
  }

  void enqueue(T v)
  {
      node* n = alloc_node();
      n->next_ = 0;
      n->value_ = v;
      store_release(&head_->next_, n);
      head_ = n;
  }

  // returns 'false' if queue is empty
  bool dequeue(T& v)
  {
      if (load_consume(&tail_->next_))
      {
          v = tail_->next_->value_;
          store_release(&tail_, tail_->next_);
          return true;
      }
      else
      {
          return false;
      }
  }

private:
  // internal node structure
  struct node
  {
      node* next_;
      T value_;
  };

  // consumer part
  // accessed mainly by consumer, infrequently be producer
  node* tail_; // tail of the queue

  // delimiter between consumer part and producer part,
  // so that they situated on different cache lines
  char cache_line_pad_ [cache_line_size];

  // producer part
  // accessed only by producer
  node* head_; // head of the queue
  node* first_; // last unused node (tail of node cache)
  node* tail_copy_; // helper (points somewhere between first_ and tail_)

  node* alloc_node()
  {
      // first tries to allocate node from internal node cache,
      // if attempt fails, allocates node via ::operator new()

      if (first_ != tail_copy_)
      {
          node* n = first_;
          first_ = first_->next_;
          return n;
      }
      tail_copy_ = load_consume(&tail_);
      if (first_ != tail_copy_)
      {
          node* n = first_;
          first_ = first_->next_;
          return n;
      }
      node* n = new node;
      return n;
  }

  spsc_queue(spsc_queue const&);
  spsc_queue& operator = (spsc_queue const&);
};

// usage example
int main()
{
  spsc_queue<int> q;
  q.enqueue(1);
  q.enqueue(2);
  int v;
  bool b = q.dequeue(v);
  b = q.dequeue(v);
  q.enqueue(3);
  q.enqueue(4);
  b = q.dequeue(v);
  b = q.dequeue(v);
  b = q.dequeue(v);
}

 

Tags:
For more complete information about compiler optimizations, see our Optimization Notice.

Comments

I liked the store release / load_consume templates & their use, very elegant
but does this actually enQ & deQ?

Yes, ok , so it took me a while ;-)
You enQ onto the head_ & deQ off the tail_
I was rather expecting the reverse, like people Q&#39;ing for theatre tickets
the head is in the theatre!

very, very elegant indeed


Thank you.
I am also thinking about posting eventcount algorithm, which can magically turn this non-blocking queue into blocking queue. Blocking in the sense that dequeue() will be blocking if there is no elements instead of returning false. Additional cost will be: MFENCE + single load + single conditional branching in enqueue(); nothing in dequeue(). Unfortunately it&#39;s impossible to implement blocking w/o MFENCE (which can be fairly costly) in enqueue().

All about lock-free algorithms, multicore, scalability, parallel computing and related topics:
http://www.1024cores.net


Thanks for this article, I'm curious, would also work on the MSVC compiler if I replace __memory_barrier() with _memoryBarrier()? I know on MSVC volatile implies a lot more then it might on the Intel compiler, although I'm not sure about the Intel compiler side.


Yes, on MSVC volatile is enough, so you can just remove __memory_barrier().
However, substitution for __memory_barrier() (i.e. full compiler fence) on MSVC is _ReadWriteBarrier().

All about lock-free algorithms, multicore, scalability, parallel computing and related topics:
http://www.1024cores.net


Thanks for the response, works beautifully in my application generating mouse events in one thread, and passing them to a receiving thread for handling



Also, just wondering what the multiple producer XCHG implies? There is no implementation anywhere. It is also advertised as cheaper than CAS, but isn't that an implicit LOCk anyway..


For those interested in C# and .Net Parallel Extensions, it will include a ConcurrentQueue to solve the producer / consumer problem.
I found a good example of its implementation in the book C# 2008 and 2005 threaded programming (Gaston Hillar, packt publishing), http://www.amazon.co.uk/2008-2005-Threaded-Programming-Beginners/dp/1847197108/ref=pd_rhf_p_t_1
You can also check Microsoft documentation. However, the examples are not as real-life as the one I mention. I recommended that book in another post.



You may see XCHG-based multi-producer queue here:
http://groups.google.com/group/lock-free/browse_frm/thread/55df71b87acb8201

Yes, it's some more scalable than CAS-based queues, because of (1) it's wait-free, (2) it doesn't require load of shared location before atomic RMW.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics:
http://www.1024cores.net


Pages