Single-Producer/Single-Consumer Queue

Unbounded single-producer/single-consumer node-based queue. Internal non-reducible cache of nodes is used. Dequeue operation is always wait-free. Enqueue operation is wait-free in common case (when there is available node in the cache), otherwise enqueue operation calls ::operator new(), so probably not wait-free. No atomic RMW operations nor heavy memory fences are used, i.e. enqueue and dequeue operations issue just several plain loads, several plain stores and one conditional branching. Cache-conscious data layout is used, so producer and consumer can work simultaneously causing no cache-coherence traffic.

Single-producer/single-consumer queue can be used for communication with thread which services hardware device (wait-free property is required), or when there are naturally only one producer and one consumer. Also N single-producer/single-consumer queues can be used to construct multi-producer/single-consumer queue, or N^2 queues can be used to construct fully-connected system of N threads (other partially-connected topologies are also possible).

Hardware platform: x86-32/64

Compiler: Intel C++ Compiler

 

 

// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
  // hardware fence is implicit on x86
  T v = *const_cast<T const volatile*>(addr);
  __memory_barrier(); // compiler fence
  return v;
}

// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
  // hardware fence is implicit on x86
  __memory_barrier(); // compiler fence
  *const_cast<T volatile*>(addr) = v;
}

// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;

// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
  spsc_queue()
  {
      node* n = new node;
      n->next_ = 0;
      tail_ = head_ = first_= tail_copy_ = n;
  }

  ~spsc_queue()
  {
      node* n = first_;
      do
      {
          node* next = n->next_;
          delete n;
          n = next;
      }
      while (n);
  }

  void enqueue(T v)
  {
      node* n = alloc_node();
      n->next_ = 0;
      n->value_ = v;
      store_release(&head_->next_, n);
      head_ = n;
  }

  // returns 'false' if queue is empty
  bool dequeue(T& v)
  {
      if (load_consume(&tail_->next_))
      {
          v = tail_->next_->value_;
          store_release(&tail_, tail_->next_);
          return true;
      }
      else
      {
          return false;
      }
  }

private:
  // internal node structure
  struct node
  {
      node* next_;
      T value_;
  };

  // consumer part
  // accessed mainly by consumer, infrequently be producer
  node* tail_; // tail of the queue

  // delimiter between consumer part and producer part,
  // so that they situated on different cache lines
  char cache_line_pad_ [cache_line_size];

  // producer part
  // accessed only by producer
  node* head_; // head of the queue
  node* first_; // last unused node (tail of node cache)
  node* tail_copy_; // helper (points somewhere between first_ and tail_)

  node* alloc_node()
  {
      // first tries to allocate node from internal node cache,
      // if attempt fails, allocates node via ::operator new()

      if (first_ != tail_copy_)
      {
          node* n = first_;
          first_ = first_->next_;
          return n;
      }
      tail_copy_ = load_consume(&tail_);
      if (first_ != tail_copy_)
      {
          node* n = first_;
          first_ = first_->next_;
          return n;
      }
      node* n = new node;
      return n;
  }

  spsc_queue(spsc_queue const&);
  spsc_queue& operator = (spsc_queue const&);
};

// usage example
int main()
{
  spsc_queue<int> q;
  q.enqueue(1);
  q.enqueue(2);
  int v;
  bool b = q.dequeue(v);
  b = q.dequeue(v);
  q.enqueue(3);
  q.enqueue(4);
  b = q.dequeue(v);
  b = q.dequeue(v);
  b = q.dequeue(v);
}

 

For more complete information about compiler optimizations, see our Optimization Notice.

27 comments

Top
anonymous's picture

This is how I wrote the pop function. How would you write it, so that it could run under GCC, Intel, or Microsoft C++, and be forward compatible to c++11.0, and be as fast as possible ?

00093 bool push_back(const T& v)
00094 {
00095 volatile node* n = alloc_node( v);
00096 if (!n) return false;
00098 //makes sure node is fully formed before it is assigned to tail_->next_
00099 memory_fence::sfence();
00102 // if tail == head, then this is the signal to pop_front that it has data
00103 tail_->next_ = n;
00105 // commit memory write because this could reduce latency.
00106 // This has no effect on program logic though.
00107 memory_fence::sfence();
00109 // advance tail
00110 tail_ = (node*)n;
00112 return true;
00113 }

anonymous's picture

If I remove line 107, memory_fence::sfence();

Then the CPU might not execute line 106, for a very long time: tail_->next_ = n;

If the program runs forever, and no other lines depend on line 106 getting executed, line 106 might not ever get executed at all.

If line 106 isn't executed, then the consumer won't ever see the new data.

So by executing the memory fence on line 107, doesn't that insure the consumer will get the data as soon as possible?

anonymous's picture

I don't have a C++ 11.0 compiler.

So how should I write the memory fences?
Should I call the functions for each compiler?

Dmitry Vyukov's picture

If you use C++11 compiler, but do not use C++11 atomics, then the compiler can easily broke you code.

You need it to be an atomic store:
00102 // if tail == head, then this is the signal to pop_front that it has data
00103 tail_->next_ = n;

00105 // commit memory write because this could reduce latency.
00106 // This has no effect on program logic though.
00107 memory_fence::sfence();
Memory fences does not work than way. They do not reduce latency, they increase latency. If there would be a way to reduce latency, it would be turned on by default for all memory operations.

Dmitry Vyukov's picture

SFENCE/LFENCE does not do what you think they do. Check out Intel IA-32/Intel64 Memory Model doc. In most cases you do not need them.
At this point I would strongly encourage you to stick with C++11/C1x atomics semantics and interface. Some compilers already support them. If not, do you own impl; you will be able to remove it once you compiler supports C++11/C1x atomics.

anonymous's picture

I just rewrote the entire piece of code. I changed the names of the functions enqueue and dequeue, to match the names of std::queue. I made all the members of spsc_queue private. I added two helper classes, spsc_queue_consumer, and spsc_queue_producer, so that the consumer object is now a separate object, of a different class, than the producer object.

I added a file with memory fences for Microsoft, gcc and Intel.

I added a class spsc_queue_tester, which has regression test, and one stress test. I added a regression test program also.

http://acumensoftwareinc.com/TechNotes/spsc_queue/index.html

Would anyone care to do a code review, or check my memory fences?

anonymous's picture

I just went over my last code posting again. I decided this piece of code needs some major work. I will be posting something better latter tonight I hope. http://codepad.org/2Sm5zT2u

anonymous's picture

I added the compiler fences. I think it is right now.
http://codepad.org/2Sm5zT2u

I still think there should be a write commit after the inter thread variables are assigned, so that other variables can see them.

In this example on Wikipedia, there is a memory fence in the example for Processor #2, so that the assignment x=42; is seen by Processor #1

anonymous's picture

I need to put compiler fences as well as the hardware fences in that piece of code. Oops

anonymous's picture

I have ports to 3 different compilers. I haven't tried it on Intel yet, because I don't have ICC installed on any of my machines at the moment. http://codepad.org/GrkDLI7v

Dmitriy, shouldn't there be two memory fences in the load_consume, store_release functions? an lfence, to make sure the right hind side of the assignment refers to valid memory, and than a rfence after the statement, to make sure the change is committed to memory, and other processes can see the change?

template<typename T>
T load_consume(T const* addr)
{
memory_fence::lfence(); // refresh memory on rhs of assignment
T v = *const_cast<T const volatile*>(addr);
memory_fence::sfence(); // commit memory on lhs of assignment
return v;
}

Pages

Add a Comment

Have a technical question? Visit our forums. Have site or software product issues? Contact support.