Combiner/Aggregator Synchronization Primitive

Some time ago Terry Wilmarth posted the blog about Aggregators in TBB. In this blog I want to explore the design space of the pattern. The post contains lots of code, and I will use mostly the term Combiner below.

So what is a combiner? It is a mutual exclusion primitive similar to a mutex. But when using a combiner you explicitly pass the critical section function, and it allows more flexibility with respect to its execution. Namely a thread can execute the critical section on behalf of another thread. Here is a simple usage example:

   combiner_t* c = combiner_create(&my_critical_section);
   ...
   combiner_execute(c, &arg);
   // at this point my_critical_section(&arg) function has been executed,
   // but not necessary by the current thread.

This is analogous to:
   mutex_t *m = mutex_create();
   …
   mutex_lock(m);
   my_critical_section(&arg);
   mutex_unlock(m);

The reasonable question -- how is it better than mutex? If we combine/aggregate several critical sections from different threads (here is where the name came from) into a single critical section and give it to a single thread to execute, it can have significant impact on cache performance. Note if there is no combing opportunities, the primitive behaves exactly as a mutex -- lock, execute the critical section in the current thread, check that there is no combining opportunities, unlock, return.

Now we are ready to outline applicability of the primitive. It can provide benefits in moderate-to-high contention scenarios; in low contention scenarios, there are few combining opportunities; and in very high contention scenarios other alternatives should be considered (to reduce contention in the first place -- partitioning, replication, batching, privatization, etc). And I would expect it to be useful for large complex data structures (e.g. trees), because for simple data structures like queues there are efficient non-blocking algorithms (note that combiner in it’s standard form is still blocking), and for small data structures cache locality may not pay back.

Let’s create a simple implementation and then see how we can improve it and what aspects we can vary. Let’s start with the interface:
typedef struct combiner combiner_t;
typedef struct combiner_arg combiner_arg_t;
struct combiner_arg {
   combiner_arg_t* next;
};

combiner_t* combiner_create(void (*fn)(combiner_arg_t*));
void combiner_destroy(combiner_t* c);
void combiner_execute(combiner_t* c, combiner_arg_t* arg);

Create and destroy functions are trivial:
struct combiner {
   void (*fn)(combiner_arg_t* arg);
   combiner_arg_t* head;
};

combiner_t* combiner_create(void (*fn)(combiner_arg_t*)) {
   combiner_t* c = (combiner_t*)malloc(sizeof(combiner_t));
   c->head = 0;
   c->fn = fn;
   return c;
}

void combiner_destroy(combiner_t* c) {
   free(c);
}

Combiner_execute() is the interesting part:
#define LOCKED ((combiner_arg_t*)1)
void combiner_execute(combiner_t* c, combiner_arg_t* arg) {
   // c->head can be in 3 states:
   // c->head == 0: no operations in-flight, initial state.
   // c->head == LOCKED: single operation in-flight, no combining opportunities.
   // c->head == pointer to some combiner_arg_t that is subject to combining.
   //            The args are chainded into a lock-free list via 'next' fields.
   // arg->next also can be in 3 states:
   // arg->next == pointer to other arg.
   // arg->next == LOCKED: denotes the last arg in the list.
   // arg->next == 0: the operation is finished.

   // The function proceeds in 3 steps:
   // 1. If c->head == 0, exchange it to LOCKED and become the combiner.
   // Otherwise, enqueue own arg into the c->head lock-free list.
   combiner_arg_t* cmp = __atomic_load_n(&c->head, __ATOMIC_ACQUIRE);
   for (;;) {
       combiner_arg_t* xchg = LOCKED;
       if (cmp) {
           // There is already a combiner, enqueue itself.
           xchg = arg;
           arg->next = cmp;
       }
       if (__atomic_compare_exchange_n(&c->head, &cmp, xchg, 1, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
           break;
   }
   // 2. If we are not the combiner, wait for arg->next to become 0
   // (which means the operation is finished).
   if (cmp) {
       while (__atomic_load_n(&arg->next, __ATOMIC_ACQUIRE) != 0) {
       }
   // 3. We are the combiner.
   } else {
       // First, execute own operation.
       c->fn(arg);
       // Then, look for combining opportunities.
       for (;;) {
           cmp = __atomic_load_n(&c->head, __ATOMIC_ACQUIRE);
           for (;;) {
               // If there are some operations in the list,
               // grab the list and replace with LOCKED.
               // Otherwise, exchange to 0.
               combiner_arg_t* xchg = 0;
               if (cmp != LOCKED)
                   xchg = LOCKED;
               if (__atomic_compare_exchange_n(&c->head, &cmp, xchg, 1, __ATOMIC_ACQ_REL,
                                                                                                                            __ATOMIC_ACQUIRE))
                   break;
           }
           // No more operations to combine, return.
           if (cmp == LOCKED)
               break;
           arg = cmp;
           // Execute the list of operations.
           while (arg != LOCKED) {
               combiner_arg_t* next = arg->next;
               c->fn(arg);
               // Mark completion.
               __atomic_store_n(&arg->next, 0, __ATOMIC_RELEASE);
               arg = next;
           }
       }
   }
}

This simple implementation can be fine for computational applications, but it can be not suitable for other types applications because a single combiner executes potentially unbounded number of operations. Indeed I’ve observed that a single thread executes millions of operations when swamped with requests from 31 concurrently executing threads.

Bounding

What we want is to bound number of operations executed by a single thread. If we bound it by, say, 32, it’s still provides good amortization of overheads. To do it we need to introduce the additional combiner parameter -- limit:
struct combiner {
   void (*fn)(combiner_arg_t* arg);
   combiner_arg_t* head;
   int limit;
};

combiner_t* combiner_create(void (*fn)(combiner_arg_t*), int limit) {
   combiner_t* c = (combiner_t*)malloc(sizeof(combiner_t));
   c->head = 0;
   c->fn = fn;
   c->limit = limit;
   return c;
}

And modify combining algorithm to stop combining after the limit reached:
#define LOCKED ((combiner_arg_t*)1)
#define HANDOFF 2  // see comments below
void combiner_execute(combiner_t* c, combiner_arg_t* arg) {
   // This part is the same.
   combiner_arg_t* cmp = __atomic_load_n(&c->head, __ATOMIC_ACQUIRE);
   for (;;) {
       combiner_arg_t* xchg = LOCKED;
       if (cmp) {
           xchg = arg;
           arg->next = cmp;
       }
       if (__atomic_compare_exchange_n(&c->head, &cmp, xchg, 1,
                   __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
           break;
   }
   int count = 0;
   if (cmp) {
       for (;;) {
           combiner_arg_t* next = __atomic_load_n(&arg->next, __ATOMIC_ACQUIRE);
           if (next == 0)
               break;
           // If we notice that our next pointer is marked with HANDOFF bit,
           // we have become the combiner.
           if ((unsigned long)next & HANDOFF) {
               // Reset the HANDOFF bit to get the correct pointer.
               arg->next = (combiner_arg_t*)((unsigned long)arg->next & ~HANDOFF);
               // Combine the rest of the list.
               goto combine;
           }
       }
   } else {
       c->fn(arg);
       count++;
       for (;;) {
           // This part is the same.
           cmp = __atomic_load_n(&c->head, __ATOMIC_ACQUIRE);
           for    (;;) {
               combiner_arg_t* xchg = 0;
               if (cmp != LOCKED)
                   xchg = LOCKED;
               if (__atomic_compare_exchange_n(&c->head, &cmp, xchg, 1,
                       __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
                   break;
           }
           if (cmp == LOCKED)
               break;
           arg = cmp;
       combine:
           while (arg != LOCKED) {
               // If we’ve reached the limit,
               // mark the current node with HANDOFF bit and return.
               // Owner of the node will execute the rest.
               if (count == c->limit) {
                   __atomic_store_n(&arg->next,
                       (combiner_arg_t*)((unsigned long)arg->next | HANDOFF), __ATOMIC_RELEASE);
                   goto done;
               }
               combiner_arg_t* next = arg->next;
               c->fn(arg);
               count++;
               __atomic_store_n(&arg->next, 0, __ATOMIC_RELEASE);
               arg = next;
           }
       }
   done:
   }
}

Intel TBB library implements a different approach to bounding (see “tbb/aggregator.h”). Each combiner thread grabs the batch of operations only once, executes what’s there and exits.
struct combiner {
   void (*fn)(combiner_arg_t* arg);
   combiner_arg_t* head;
   int busy;  // New field, see below.
};

void combiner_execute(combiner_t* c, combiner_arg_t* arg) {
   // Enqueue the arg into the list (the same).
   combiner_arg_t* cmp = __atomic_load_n(&c->head, __ATOMIC_ACQUIRE);
   for (;;) {
       arg->next = cmp;
       if (__atomic_compare_exchange_n(&c->head, &cmp, arg, 1,
                   __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
           break;
   }
   if (cmp) {
       // If we’ve enqueued into non-empty list,
       // we are not the combiner.  Wait for completion (the same).
       while (__atomic_load_n(&arg->next, __ATOMIC_ACQUIRE)) {
       }
   } else {
       // The combiner algorithm is somewhat different.
       // Wait for the previous combiner to finish.
       while (__atomic_load_n(&c->busy, __ATOMIC_ACQUIRE)) {
       }
       // Mark as busy, so that the next combiner does not step on us.
       __atomic_store_n(&c->busy, 1, __ATOMIC_RELAXED);
       // Grab the batch of operations only once.
       arg = __atomic_exchange_n(&c->head, 0, __ATOMIC_ACQ_REL);
       // Execute the batch of operations.
       while (arg) {
           combiner_arg_t* next = arg->next;
           c->fn(arg);
           __atomic_store_n(&arg->next, 0, __ATOMIC_RELEASE);
           arg = next;
       }
       // Allow the next combiner to proceed.
       __atomic_store_n(&c->busy, 0, __ATOMIC_RELEASE);
   }
}

While TBB algorithm prevents execution of unbounded number of operations by a single thread, it still can be arbitrary high (bounded only by number of threads), and at the same time it misses lots of combining opportunities. First, combiner can not aggregate more than one operation from a single thread (they will necessary go into different batches). And second, if a thread submits an operation while another thread is combining, the operation won’t be joined to the batch (while it possibly could). We will see effects of this weak combining in the evaluation section.

Async operations

Combiners provide another interesting opportunity -- the operations can be executed asynchronously. That is, if a thread is not interested in the result of the operation (e.g. it just wants to insert/remove a node from a container), it can submit the operation into the queue and just return immediately. This makes the algorithm fully non-blocking, threads do not wait for each other.
Async operations require the minimal modification to the algorithm:
void combiner_execute(combiner_t* c, combiner_arg_t* arg, int async) {
   // Enqueue the operation or become the combiner.
   // The same as above, omitted.
   // ...
   if (cmp) {
       // If the caller does not need the result, just return.
       if (async)
           return;
       while (__atomic_load_n(&arg->next, __ATOMIC_ACQUIRE) != 0) {
       }
   } else {
       // Combiner algorithm is the same
       // ...
   }
}

However, the issue is who owns the arg parameter, how to allocate it and how to free it? In the previous algorithms it is natural to allocate arg on the stack of the caller thread. But it is impossible with async operations, because the arg may be used after the caller thread have destroyed the stack frame.
Several solutions are possible. The simplest option is to allocate the arg with malloc() and free() it after execution. If, for example, we want to insert a node into a container, then we can use the node itself as the arg. However, I want to show a general and efficient solution -- each thread has N local arg objects and reuses them when the previous async operations finish:
#define N 4
static __thread my_combiner_arg_t arg_cache[N];

my_combiner_arg_t* get_thread_arg(void) {
   for (;;) {
       // Look for an unused arg.
       for (int i = 0; i < N; i++) {
           if (__atomic_load_n(&arg_cache[i].base.next, __ATOMIC_ACQUIRE) == 0)
               return &arg_cache[i];
       }
   }
}

// This function must be executed when thread exits to wait for any outstanding operations.
void wait_for_pending(void) {
   for (int i = 0; i < N; i++) {
       while (__atomic_load_n(&arg_cache[i].base.next, __ATOMIC_ACQUIRE)) {
       }
   }
}

void* thread(void* p) {
   for (...) {
       my_combiner_arg_t* arg = get_thread_arg();
       // ...
       combiner_execute(c, arg, 1);
   }
   wait_for_pending();
   return 0;
}

As we will see in the evaluation section, asynchronous operations provide good performance improvements.
Note that if a thread must observe effects of previous operations (e.g. insert a node into a container, then search for the node), the batches of operations must be reversed to get FIFO order. Otherwise with async operations, the search can be executed before the insertion.

Flat combining

There are two recent interesting research papers on combining, one of them is “Flat Combining and the Synchronization-Parallelism Tradeoff“. The idea is as follows. In the algorithms above threads enqueue operations into a central queue with a CAS operation, this inevitably incurs additional costs per operation. In flat combining each thread has own persistent descriptor that is enqueued into the list (array), then in order to submit an operation the thread uses an atomic store (instead of CAS). As the result, the combiner has to poll all thread descriptors to find the “armed“ ones:
__thread int thread_id = -1;
void combiner_execute(combiner_t* c, combiner_arg_t* arg) {
   // Allocate thread id.
   int tid = thread_id;
   if (tid < 0)
       tid = thread_id = __atomic_fetch_add(&c->seq, 1, __ATOMIC_RELAXED);
   combiner_thr_t* thr = &c->thr[tid];
   // Submit own operation.
   __atomic_store_n(&thr->req, arg, __ATOMIC_RELEASE);
   for (;;) {
       // Try to become the combiner.
       if (__atomic_exchange_n(&c->lock, 1, __ATOMIC_ACQUIRE) == 0) {
           // We are the combiner.
           // Poll thread descriptors to find the armed ones.
           for (int try = 0; try < 3; try++) {
               int cnt = __atomic_load_n(&c->seq, __ATOMIC_RELAXED);
               for (int i = 0; i < cnt; i++) {
                   thr = &c->thr[i];
                   arg = __atomic_load_n(&thr->req, __ATOMIC_ACQUIRE);
                   if (arg) {
                       c->fn(arg);
                       // Mark completion of the operation.
                       __atomic_store_n(&thr->req, 0, __ATOMIC_RELEASE);
                   }
               }
           }
           // Unbecome the combiner.
           __atomic_store_n(&c->lock, 0, __ATOMIC_RELEASE);
           return;
       } else {
           // We are not the combiner.
           // Wait for the operation to be executed or the lock is unlocked.
           while (__atomic_load_n(&thr->req, __ATOMIC_RELAXED)
               && __atomic_load_n(&c->lock, __ATOMIC_RELAXED)) {
           }
           if (__atomic_load_n(&thr->req, __ATOMIC_ACQUIRE) == 0)
               return;
       }
   }
}

The advantage of the algorithm is that threads do not need to execute CAS to submit operations. The downside is that the combiner needs to poll descriptors (potentially useless work). The algorithm seems to be designed for SPARC machines where write sharing is cheap but CAS operations are expensive. As authors say, the algorithm degrade quickly as contention decreases (the combiner senselessly polls descriptors just to find no pending operations). So this algorithm is ideal for synthetic benchmark with no thread-local work.

Dedicated combining thread

Another interesting research paper is “Remote Core Locking: Migrating Critical-Section Execution to Improve the Performance of Multithreaded Applications“. The idea is similar to flat combining (each thread has own descriptor), but all operations are executed by a dedicated combiner thread. The combiner_execute() function becomes almost trivial:
__thread int thread_id = -1;
void combiner_execute(combiner_t* c, combiner_arg_t* arg) {
   // Allocate thread id.
   int tid = thread_id;
   if (tid < 0)
       tid = thread_id = __atomic_fetch_add(&c->seq, 1, __ATOMIC_RELAXED);
   combiner_thr_t* thr = &c->thr[tid];
   // Store own operation.
   __atomic_store_n(&thr->req, arg, __ATOMIC_RELEASE);
   // Wait for execution.
   while (__atomic_load_n(&thr->req, __ATOMIC_ACQUIRE)) {
   }
}

But there is also the dedicated thread that executes the operations:
void* combiner_thread(void* p) {
   combiner_t* c = (combiner_t*)p;
   // Check for shutdown.
   while (__atomic_load_n(&c->done, __ATOMIC_RELAXED) == 0) {
       int cnt = __atomic_load_n(&c->seq, __ATOMIC_RELAXED);
       // Poll thread descriptors.
       for (int i = 0; i < cnt; i++) {
           combiner_thr_t* thr = &c->thr[i];
           combiner_arg_t* arg = __atomic_load_n(&thr->req, __ATOMIC_ACQUIRE);
           if (arg) {
               c->fn(arg);
               // Mark completion of the operation.
               __atomic_store_n(&thr->req, 0, __ATOMIC_RELEASE);
           }
       }
   }
   return 0;
}

Threads also use an atomic store to submit operations (no CAS), and another advantage is that the dedicated combiner thread always has the protected data structure in the cache (other threads do not even touch it). The downsides are that single-threaded latency increases (worker threads always need to communicate with the dedicated thread) and the dedicated thread always burns CPU. This algorithm seems to be good as band-aid for legacy applications that suddenly find themselves executing on highly parallel machines.

Implementation notes

Some implementation notes before moving to evaluation.
As always it is important to put cache line padding in proper places to prevent false sharing. In particular, original flat combining algorithm does not use paddings between thread descriptors, but on Intel processors I’ve observed significant speedup if the padding is added. In general I’ve observed up to 2x speed difference with and without the paddings.

Since some of the combiner algorithms use linked lists of arguments, it’s beneficial to use software prefetching. That is, while we are executing the current operation, prefetch the next. The actual implementation of the execution loop I used in benchmarks is as follows:
           while (arg != LOCKED) {
               combiner_arg_t* next = arg->next;
               if (next != LOCKED)
                   __builtin_prefetch(next);
               c->fn(arg);
               __atomic_store_n(&arg->next, 0, __ATOMIC_RELEASE);
               arg = next;
           }

I’ve observed up to 15% speedup with and without prefetching.

I used simple active spin loop to wait for completion. It is fine for benchmarks. However, in real implementation it may be beneficial to use something more complex (e.g. at least sched_yield() after some number of iterations).

Full source code for all algorithms and the benchmark driver is attached to the post.

Evaluation

In the benchmark all threads execute N operations, number of threads is varied as 1, 2, 4, 8, 16, 32. The protected operation is traversal of a linked list of length 30. For benchmarking I used a machine with 2 Intel Xeon E5-2690 CPUs running at 2.90GHz (16 HT cores total).

The first experiment is with per-thread local work consisting of 100 division instructions:

The second experiment is with per-thread local work consisting of 1000 division instructions (fewer combining opportunities):

As expected, remote core locking has bad single-threaded performance (2-6 times slower), but behaves acceptably under contention. Flat combining degrades when local work is increasing (more senseless polling). TBB combining algorithm suffers from missing combining opportunities, e.g. with 4 threads TBB algorithm combines a mean of 1.75 operations, while the proposed "Bounded" algorithm - 3.40 operations, this explains the difference in performance. Async version performs best, which is not surprising since it better interleaves critical sections and local work. Note that async operations can be applied to other algorithms as well (e.g. flat combining).

AdjuntoTamaño
Descargar combiner.zip11.62 KB
Para obtener más información sobre las optimizaciones del compilador, consulte el aviso sobre la optimización.