Eventcount (gate) proposal

Eventcount (gate) proposal

Dmitry Vyukov's picture

I've implemented a sketch of eventcount for TBB. It can be used as replacement for Gate in current TBB scheduler, and/or it can be used as blocking/signalling logic in concurrent_queue, and/or it can be exposed as public API, and/or on top of it portable condition variable can be implemented and exposed as public API. I want to know what do you think about implementation and these usages, and whether it's worth doing for me to finish the implementation and submit it as official contribution.

The eventcount is portable, fine-grained, fairly efficient, general-purpose and reusable.
Portable in a sense that it requires only semaphore, mutex and full memory fence primitives. No Win32 events and futexes.
Fine-grained in a sense that it supports notifications of interesting set of threads, not only coarse-grained notify_one() and notify_all().
Fairly efficient in a sense that producer overhead is single load, single test, single conditional jump, and possibly single full memory fence (controlled by user depending on an algorithm) on fast-path, consumer overhead is no-op on fast-path.
General-purpose in a sense that it includes no management of user state, this simplifies reasoning and implementation and allows reusability.
Reusable in a sense that it can be used basically with any predicate (user algorithm).

Implementation takes into account Arch's requirements:
http://software.intel.com/en-us/forums/showpost.php?p=71895

I've attached the implementation and will post it along with several examples in following posts.

AttachmentSize
Download 80055.jpg4.25 KB
All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
26 posts / 0 new
Last post
For more complete information about compiler optimizations, see our Optimization Notice.
Dmitry Vyukov's picture

First of all several helper components: semaphore, mutex, full memory fence and doubly-linked list (just to make implementation self-contained):

/** Fine-grained Eventcount
 *  Copyright (C) 2008  Dmitriy S. V'jukov
 */

#include 


#if defined(WIN32) && defined(_MSC_VER)

#include 
#include 

class semaphore
{
public:
    semaphore()
    {
        h_ = CreateSemaphore(0, 0, LONG_MAX, 0);
    }

    ~semaphore()
    {
        CloseHandle(h_);
    }

    void wait()
    {
        WaitForSingleObject(h_, INFINITE);
    }

    void post()
    {
        ReleaseSemaphore(h_, 1, 0);
    }

private:
    HANDLE h_;

    semaphore(semaphore const&);
    semaphore& operator = (semaphore const&);
};

class mutex
{
public:
    mutex()
    {
        InitializeCriticalSection(&cs_);
    }

    ~mutex()
    {
        DeleteCriticalSection(&cs_);
    }

    void lock()
    {
        EnterCriticalSection(&cs_);
    }

    void unlock()
    {
        LeaveCriticalSection(&cs_);
    }

private:
    CRITICAL_SECTION    cs_;

    mutex(mutex const&);
    mutex& operator = (mutex const&);
};

void full_memory_fence()
{
    _mm_mfence();
}

#define THREAD_LOCAL __declspec(thread)

#elif defined(POSIX) && defined(GCC)

#include 
#include 

class semaphore
{
public:
    semaphore()
    {
        sem_init(&sem_, 0, 0);
    }

    ~semaphore()
    {
        sem_destroy(&sem_);
    }

    void wait()
    {
        sem_wait(&sem_);
    }

    void post()
    {
        sem_post(&sem_);
    }

private:
    sem_t               sem_;

    semaphore(semaphore const&);
    semaphore& operator = (semaphore const&);
};

class mutex
{
public:
    mutex()
    {
        pthread_mutex_init(&mutex_, 0);
    }

    ~mutex()
    {
        pthread_mutex_destroy(&mutex_);
    }

    void lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

private:
    pthread_mutex_t     mutex_;

    mutex(mutex const&);
    mutex& operator = (mutex const&);
};

void full_memory_fence()
{
    __sync_synchronize();
}

#define THREAD_LOCAL __thread

#endif



class lock
{
public:
    lock(mutex& m)
        : m_(m)
    {
        m.lock();
    }

    ~lock()
    {
        m_.unlock();
    }

private:
    mutex&              m_;

    lock(lock const&);
    lock& operator = (lock const&);
};




/** simple single-threaded double-linked list
 *  nothing interesting
 */
class dlist
{
public:
    struct node
    {
        node*           prev_;
        node*           next_;

        node()
        {
            prev_ = 0;
            next_ = 0;
        }
    };

    dlist()
    {
        reset();
    }

    void push(node* n)
    {
        size_ += 1;
        n->next_ = head_.next_;
        n->prev_ = &head_;
        head_.next_->prev_ = n;
        head_.next_ = n;
    }

    node* pop()
    {
        if (size_ == 0)
            return 0;
        node* n = head_.next_;
        remove(n);
        return n;
    }

    void remove(node* n)
    {
        size_ -= 1;
        n->prev_->next_ = n->next_;
        n->next_->prev_ = n->prev_;
    }

    size_t size() const
    {
        return size_;
    }

    node* begin()
    {
        return head_.next_;
    }

    void flush_to(dlist& target)
    {
        if (size_)
        {
            target.size_ = size_;
            target.head_.next_ = head_.next_;
            target.head_.next_->prev_ = &target.head_;
            target.tail_.prev_ = tail_.prev_;
            target.tail_.prev_->next_ = &target.tail_;
        }
        else
        {
            target.reset();
        }
        reset();
    }

    static bool not_last(node* n)
    {
        return n->next_ != 0;
    }

    static node* get_next(node* n)
    {
        return n->next_;
    }

private:
    size_t volatile     size_;
    node                head_;
    node                tail_;

    void reset()
    {
        size_ = 0;
        head_.next_ = &tail_;
        head_.prev_ = 0;
        tail_.next_ = 0;
        tail_.prev_ = &head_;
    }

    dlist(dlist const&);
    dlist& operator = (dlist const&);
};

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Implementation of eventcount:

/** Fine-grained Eventcount
 *  Copyright (C) 2008  Dmitriy S. V'jukov
 */


/** pre-thread descriptor for eventcount
 */
struct ec_thread
{
    dlist::node         node_;
    semaphore           sema_;
    unsigned            epoch_;
    bool volatile       in_waitset_;
    bool                spurious_;
    void*               ctx_;

    ec_thread()
    {
        epoch_ = 0;
        in_waitset_ = false;
        spurious_ = false;
        ctx_ = 0;
    }

    ~ec_thread()
    {
        if (spurious_)
            sema_.wait();
    }

    static ec_thread* current()
    {
        static THREAD_LOCAL ec_thread* ec_thread_instance = 0;
        ec_thread* instance = ec_thread_instance;
        if (instance == 0)
        {
            instance = new ec_thread;
            ec_thread_instance = instance;
        }
        return instance;
        // instance must be destroyed in DllMain() callback
        // or in pthread_key_create() callback
    }

private:
    ec_thread(ec_thread const&);
    ec_thread& operator = (ec_thread const&);
};



/** fine-grained eventcount implementation
 */
class eventcount
{
public:
    eventcount()
    {
        epoch_ = 0;
    }

    void prepare_wait(void* ctx = 0)
    {
        ec_thread* th = ec_thread::current();
        // this is good place to pump previous spurious wakeup
        if (th->spurious_)
        {
            th->spurious_ = false;
            th->sema_.wait();
        }
        th->in_waitset_ = true;
        th->ctx_ = ctx;
        {
            lock l (mtx_);
            th->epoch_ = epoch_;
            waitset_.push(&th->node_);
        }
        full_memory_fence();
    }

    void wait()
    {
        ec_thread* th = ec_thread::current();
        // this check is just an optimization
        if (th->epoch_ == epoch_)
            th->sema_.wait();
        else
            retire_wait();
    }

    void retire_wait()
    {
        ec_thread* th = ec_thread::current();
        // spurious wakeup will be pumped in following prepare_wait()
        th->spurious_  = true;
        // try to remove node from waitset
        if (th->in_waitset_)
        {
            lock l (mtx_);
            if (th->in_waitset_)
            {
                // successfully removed from waitset,
                // so there will be no spurious wakeup
                th->in_waitset_ = false;
                th->spurious_ = false;
                waitset_.remove(&th->node_);
            }
        }
    }

    void notify_one()
    {
        full_memory_fence();
        notify_one_relaxed();
    }

    template
    void notify(predicate_t pred)
    {
        full_memory_fence();
        notify_relaxed(pred);
    }

    void notify_all()
    {
        full_memory_fence();
        notify_all_relaxed();
    }

    void notify_one_relaxed()
    {
        if (waitset_.size() == 0)
            return;
        dlist::node* n;
        {
            lock l (mtx_);
            epoch_ += 1;
            n = waitset_.pop();
            if (n)
                to_ec_thread(n)->in_waitset_ = false;
        }
        if (n)
        {
            to_ec_thread(n)->sema_.post();
        }
    }

    template
    void notify_relaxed(predicate_t pred)
    {
        if (waitset_.size() == 0)
            return;
        dlist temp;
        {
            lock l (mtx_);
            epoch_ += 1;
            size_t size = waitset_.size();
            size_t idx = 0;
            dlist::node* n = waitset_.begin();
            while (dlist::not_last(n))
            {
                dlist::node* next = dlist::get_next(n);
                ec_thread* th = to_ec_thread(n);
                if (pred(th->ctx_, size, idx))
                {
                    waitset_.remove(n);
                    temp.push(n);
                    th->in_waitset_ = false;
                }
                n = next;
                idx += 1;
            }
        }
        dlist::node* n = temp.begin();
        while (dlist::not_last(n))
        {
            dlist::node* next = dlist::get_next(n);
            to_ec_thread(n)->sema_.post();
            n = next;
        }
    }

    void notify_all_relaxed()
    {
        if (waitset_.size() == 0)
            return;
        dlist temp;
        {
            lock l (mtx_);
            epoch_ += 1;
            waitset_.flush_to(temp);
            dlist::node* n = temp.begin();
            while (dlist::not_last(n))
            {
                to_ec_thread(n)->in_waitset_ = false;
                n = dlist::get_next(n);
            }
        }
        dlist::node* n = temp.begin();
        while (dlist::not_last(n))
        {
            dlist::node* next = dlist::get_next(n);
            to_ec_thread(n)->sema_.post();
            n = next;
        }
    }

private:
    mutex               mtx_;
    dlist               waitset_;
    volatile unsigned   epoch_;

    ec_thread* to_ec_thread(dlist::node* n)
    {
        return (ec_thread*)((char*)n - offsetof(ec_thread, node_));
    }

    eventcount(eventcount const&);
    eventcount& operator = (eventcount const&);
};

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Here is how the eventcount can be used in TBB scheduler:

/** Fine-grained Eventcount
 *  Copyright (C) 2008  Dmitriy S. V'jukov
 */


struct scheduler
{
    struct tbb_thread {};

    eventcount          ec_;
    tbb_thread*         threads_;
    bool volatile       is_permanently_open_;

    void wait_while_pool_is_empty(tbb_thread* th)
    {
        if (is_permanently_open_)
            return;
        ec_.prepare_wait(th);
        if (pool_is_empty())
            ec_.wait();
        else
            ec_.retire_wait();
    }

    void notify_about_new_task_available()
    {
        ec_.notify_one_relaxed();
    }

    void notify_about_new_task_available_with_preference(tbb_thread* preference)
    {
        struct local
        {
            tbb_thread*     preference_;
            bool            fired_;

            bool operator () (void* ctx, size_t count, size_t idx)
            {
                tbb_thread* th = (tbb_thread*)ctx;
                if (th == preference_)
                {
                    fired_ = true;
                    return true;
                }
                else if (idx == count - 1 && fired_ == false)
                {
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
        pred = {preference};
        ec_.notify_relaxed(pred);
    }

    void notify_about_list_of_tasks_available(size_t total_count, size_t preference_count, tbb_thread** preferences)
    {
        struct local
        {
            size_t          remain_to_signal_;
            size_t          preference_count_;
            tbb_thread**    preferences_;

            bool operator () (void* ctx, size_t count, size_t idx)
            {
                tbb_thread* th = (tbb_thread*)ctx;
                size_t remain_in_waitset = count - idx;
                if (remain_in_waitset <= remain_to_signal_)
                {
                    return true;
                }
                else
                {
                    for (size_t i = 0; i != preference_count_; ++i)
                    {
                        if (preferences_[i] == th)
                        {
                            remain_to_signal_ -= 1;
                            return true;
                        }
                    }
                }
                return false;
            }
        }
        pred = {total_count, preference_count, preferences};
        ec_.notify_relaxed(pred);
    }

    bool pool_is_empty()
    {
        return true;
    }
};

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Here is how the eventcount can be used in concurrent_queue to eliminate thundering herd:

struct queue
{
    int                 producer_idx_;
    int                 consumer_idx_;

    void**              buffer_;

    eventcount          ec_;

    void enqueue(void* data)
    {
        int idx = ++producer_idx_; // atomic
        buffer_[idx] = data;

        struct local
        {
            int         idx_;
            bool operator () (void* ctx, size_t count, size_t idx)
            {
                return idx_ == *(int*)ctx;
            }
        }
        pred = {idx};
        ec_.notify(pred); // not relaxed!!!
    }

    void* dequeue()
    {
        int idx = ++consumer_idx_; // atomic
        void* data = buffer_[idx];
        if (data)
            return data;
        for (;;)
        {
            ec_.prepare_wait(&idx);
            data = buffer_[idx];
            if (data)
            {
                ec_.retire_wait();
                return data;
            }
            ec_.wait();
            data = buffer_[idx];
            if (data)
            {
                return data;
            }
        }
    }
};

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Here is how the eventcount can be used in a general setup to notify about arbitrary state changes (in conjunction with a lock-based, lock-free or even wait-free containers - eventcount itself is wait-free on fast-path):

eventcount ec;

// non-blocking consume
void* consume_impl()
{
    // a plurality of non-blocking containers
    return high_prio_queue.dequeue()
      || normal_prio_queue.dequeue()
      || work_stealing_dequeue.pop()
      || low_prio_queue.dequeue()
      || global_root_task_queue.dequeue();
}

// blocking consume
void* consume()
{
    void* data = 0;
    if (data = consume_impl())
        return data;
    for (;;)
    {
        ec.prepare_wait();
        if (data = consume_impl())
        {
            ec.retire_wait();
            return data;
        }
        ec.wait();
        if (data = consume_impl())
            return data;
    }
}

void produce(void* data)
{
    some_queue_or_deque.enqueue(data);
    ec.notify_all();
}

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

And finally here is how one can build portable condition variable with wait-free fast-path for signalers on top of the eventcount:

class condition_variable
{
    eventcount ec_;

public:
    void wait(mutex& mtx)
    {
        ec_.prepare_wait();
        mtx.unlock();
        ec_.wait();
        mtx.lock();
    }

    void signal()
    {
        ec_.notify_one();
    }

    void broadcast()
    {
        ec_.notify_all();
    }
}; 

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

The considerable part of credits must go to Chris Thomasson:

http://software.intel.com/en-us/profile/6002

and Joseph Seigh:

http://software.intel.com/en-us/profile/390813

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Arch D. Robison (Intel)'s picture

Nice work and well presented! Yes, please contribute it as a TBB submission when you are happy with it.

For clients like the TBB scheduler, we would probably make the ec_thread object a parameter to the wait-related methods instead of using ec_thread::current(), and thus avoid using another TLS slot. For clients like concurrent_queue, your interface seems most sensible.

If the relaxed notification methods are going to be public, they need documentation explaining the fencing rules. The rule is "don't let a notification become visible before the action it is advertising", right?

The waiting logic is a two-phase commit protocol. I suspect two-phase commit will show up in the future, so it would be good to establish a regular naming scheme for two-phase commit now. I like "prepare_wait". I'd prefer the name "cancel_wait" over "retire_wait", because "retire" tends to imply success around here (as in "retiring instructions"). The name for the operation that completes a successful wait should have a longer name than "wait" to make it clear that it is the completion of a two-phase commit. If people have suggestions, I suggest starting them as a separate forum topic on "naming for two-phase commit protocols"

Thanks for taking the time for such a detailed posting.

- Arch

Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

Nice work and well presented! Yes, please contribute it as a TBB submission when you are happy with it.

Ok. The most notable thing is that I have to run it at least once before contribution :)

Quoting - Arch Robison (Intel)
For clients like the TBB scheduler, we would probably make the ec_thread object a parameter to the wait-related methods instead of using ec_thread::current(), and thus avoid using another TLS slot. For clients like concurrent_queue, your interface seems most sensible.

Hmmm... What do you mean here? __declspec(thread) doesn't use OS TLS slot. MSVC runtime uses single OS TLS slot for ALL __declspec(thread) variables, they are distinguish by offsets in that TLS slot.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

If the relaxed notification methods are going to be public, they need documentation explaining the fencing rules. The rule is "don't let a notification become visible before the action it is advertising", right?

From technical point of view it must be something like "don't let the load of eventcount state become visible before the action it is advertising". Because the point is that there must be store-load fence between modification of state and call to notify_relaxed().

For example, if modification of state is made with atomic RMW with full fence (which must contain trailing store-load fence), then signaling can be made with relaxed version.

Hmm... btw, another situation is when user doesn't need 100% guarantee wrt notification and can tolerate small possibility of a race between producer and consumer. Good example is scheduler like TBB's task scheduler. Or I was using such relaxed notifications in distributed memory allocator, where I was able to tolerate small amount of memory delayed in depths of the allocator.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

The waiting logic is a two-phase commit protocol. I suspect two-phase commit will show up in the future, so it would be good to establish a regular naming scheme for two-phase commit now. I like "prepare_wait". I'd prefer the name "cancel_wait" over "retire_wait", because "retire" tends to imply success around here (as in "retiring instructions"). The name for the operation that completes a successful wait should have a longer name than "wait" to make it clear that it is the completion of a two-phase commit. If people have suggestions, I suggest starting them as a separate forum topic on "naming for two-phase commit protocols"

Agree about cancel_wait().

About wait()... maybe... but I don't see better names... commit_wait() is definitely stupid.

And note that it's better used with following wrapper. So names prepare_wait() and cancel_wait() is not visible to the end user. Probably wait related methods of eventcount must be made private and blocking class be declared as friend. Anyway it's currently prohibited to make overlapping prepare_wait-wait/cancel_wait sequences.

class eventcount
{
public:
    ...
		class blocking
		{
		    blocking(eventcount& ec)
		        : ec_(ec)
		        , wait_(false)
		    {
		        ec_.prepare_wait();
		    }
		
		    void wait()
		    {
		        assert(false == wait_);
		        wait_ = true;
		        ec_.wait();
		    }
		
		    ~blocking()
		    {
		        if (false == wait_)
		            ec_.cancel_wait();
		    }
		
		private:
		    eventcount& ec_;
		    bool wait_;
		
		    blocking(blocking const&);
		    blocking& operator = (blocking const&);
		};
		...
};

void* consume()
{
    void* data = 0;
    if (data = q.dequeue())
        return data;
    for (;;)
    {
        eventcount::blocking block (ec);
        if (data = q.dequeue())
            return data;
        block.wait();
        if (data = q.dequeue())
            return data;
    }
}

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Arch D. Robison (Intel)'s picture

In my remark about TLS slots, I was thinking in general across OSes. It may not be an issue for Windows. However, __declspec(thread) is broken on pre-Vista OSes. It does not work correctly for explicitly loaded dynamic libraries (see here). We found out the hard way from a customer writing plug-ins with TBB 1.0, and switched to using TlsAlloc.

"commit_wait" is not totally crazy. The thread is committing to wait until woken up. It's like committing to hibernation in science fiction. It's a serious commitment :-)

Another way to wrap the two-phase commit protocol, which is particularly attractive with C++0x lambda expressions, would be to have wait take a functor argument that returns true if wait should commit, and false if it should cancel.

Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

In my remark about TLS slots, I was thinking in general across OSes. It may not be an issue for Windows. However, __declspec(thread) is broken on pre-Vista OSes. It does not work correctly for explicitly loaded dynamic libraries (see here). We found out the hard way from a customer writing plug-ins with TBB 1.0, and switched to using TlsAlloc.

Oh, Ok, I see. So, while user of eventcount provides thread descriptor manually, eventcount must not allocate TLS slots. But once user calls prepare_wait() w/o thread descriptor, eventcount will allocate TLS slot.

There is another problem. On Windows eventcount must hook into DllMain to get thread detach notifications.

The other possible solution is to implement trick desribed here:

http://www.codeguru.com/Cpp/misc/misc/threadsprocesses/article.php/c6945...

It allows one to get thread atach/detach notifications w/o DllMain, even from statically linked libraries. However it's quite risky. It was working on my machine with statically/dynamically linked run-time, under debug/release, on several versions of MSVC; but I heard from some people that it doesn't work on their machines... however maybe they was testing on MinGW or CygWin, I don't know.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

"commit_wait" is not totally crazy. The thread is committing to wait until woken up. It's like committing to hibernation in science fiction. It's a serious commitment :-)

Ok, it was sounding stupid from point of view of non-native speaker. I will rename it to commit_wait() in eventcount. However what to do in eventcount::blocking? It doesn't have prepare_wait() and cancel_wait(), only wait()...

Quoting - Arch Robison (Intel)

Another way to wrap the two-phase commit protocol, which is particularly attractive with C++0x lambda expressions, would be to have wait take a functor argument that returns true if wait should commit, and false if it should cancel.

Yes, it's possible to make an interface like boost/std::condition_variable has, with:

template void wait(pred_t pred);

However I am not sure whether I want so much sugar in low-level synchronization primitives. Note that it also possible to add similar signature for atomic<>::compare_exchange(), which accepts functor which receives old value and returns new, such compare_exchange() will not return bool or old value.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Arch D. Robison (Intel)'s picture

On the syntactic sugar issue, I'd be fine with you keeping your original proposed interface with prepare/commit/cancel methods, and have something completely separate for the sugared lambda form. I like the way C++0x did their mutexes and provided both a low level unstructured interface and a high level interface.At the low level, there raw lock/unlock calls, and for exception safety andstructure there is the lock_guard form. Maybe call your blocking class wait_guard?

Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

On the syntactic sugar issue, I'd be fine with you keeping your original proposed interface with prepare/commit/cancel methods, and have something completely separate for the sugared lambda form.

Ok. It's always possible to add those methods later, if there will be the need.

Quoting - Arch Robison (Intel)

At the low level, there raw lock/unlock calls, and for exception safety andstructure there is the lock_guard form. Maybe call your blocking class wait_guard?

Ok. I will rename it.

Now I must get some time for validation of the thing...

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Arch D. Robison (Intel)'s picture

Quoting - Dmitriy Vyukov

Now I must get some time for validation of the thing...

Any progress on validating it? I keep running into situations where it would be useful to have your official contribution.

- Arch

Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

Quoting - Dmitriy Vyukov

Now I must get some time for validation of the thing...

Any progress on validating it? I keep running into situations where it would be useful to have your official contribution.

- Arch

Sorry for the delay. Now I've almost recovered from New Year's holidays :)

Ok, let's do it this way. I've made minor changes (rename methods, add wait_guard class, add explicit ec_thread parameter) and submitted it as official contribution (and also attached copy to this post).

I will still try to validate it, and if I will find some bugs I will resubmit or just report them here.

Attachments: 

AttachmentSize
Download eventcount.zip2.33 KB
All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Arch D. Robison (Intel)'s picture

Quoting - Dmitriy Vyukov
Ok, let's do it this way. I've made minor changes (rename methods, add wait_guard class, add explicit ec_thread parameter) and submitted it as official contribution (and also attached copy to this post).

Thanks!

- Arch

Dmitry Vyukov's picture

I've made some unit-tests with Relacy Race Detector (mpmc-queue with fine-grained signaling, and test which uses condition variable implemented on top of eventcount). The only error revealed so far is:

    void commit_wait(ec_thread* th = 0)
    {
        if (th == 0)
            th = ec_thread::current();
        // this check is just an optimization
        if (th->epoch_ == epoch_)
            th->sema_.wait();
        else
            cancel_wait(th); // <--- add 'th' parameter here, it's missed in original version
    }

Also some variables must be replaced with atomic and relaxed operations on them (in order to shutdown 'data race' errors and provide better documentation). Here is a list of such variables:
dlist::size_
ec_thread::in_waitset_
eventcount::epoch_
Note that all operations on them must be relaxed, i.e. increment of the epoch must look like:
unsigned ep = epoch_.load(std::memory_order_relaxed);
epoch_.store(ep + 1, std::memory_order_relaxed);

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

I've found an error regarding management of user ctx passed into prepare_wait(). If consumer thread takes optimized path in commit_wait() (i.e. does not blocks on semaphore, but just notices that his in_waitset_ variable == false), then producer and consumer do not correctly synchronize on in_waitset_ variable. This can lead to races on user's ctx.
In order to fix it, modification of in_waitset_ in notify_* functions must be made with memory_order_release semantics, and first unprotected load of in_waitset_ in cancel_wait() must be made with memory_order_acquire semantics.
Another way to fix it is to remove unprotected load of in_waitset_ from cancel_wait(), then in_waitset_ will be synchronized solely by means of the mutex. This variant is nice, because does not introduce additional fences into notify_* functions (not relevant for x86).

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Note that it's possible to pass information not only from consumer to producer, but from producer to consumer too - producer may just modify consumer's ctx before notification. This way consumer may say what he is waiting for, and producer can say about what he notifies. For example, consumer says "I am waiting for state change of items 7, 19 and 47" and producer says "I notify you because of state change of item 47".
Consider TBB task scheduler. Assume we have 128 processors/worker threads. After notification worker thread starts feverishly checking all other worker threads' deques in order to find the item. Assume that check of remote deque incurs 2 cache misses, let's say 600 cycles. In the worst case thread will find the item after check of 126 remote deques. I.e. it's 76800 cycles or 25 microseconds.
With fine-grained eventcount, producer can explicitly pass the index of deque where the item resides, so after notification thread will first check that deque.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Dmitry Vyukov's picture

Another improvement that I am thinking of is to allow producer to decide whether he wants to increase eventcount's epoch or not:

    template
    void notify_relaxed(predicate_t pred, bool precise)
    {
	...
		if (precise == false)
		{
                      unsigned ep = epoch_.load(memory_order_relaxed);
      	              epoch_.store(ep + 1, memory_order_relaxed);
		}
	...
    }

This is relevant only to notify(pred) version; notify_one() and notify_all() must increase epoch unconditionally.

What this will give to us? 'precise' flag determines whether selective notification is only an optimization and it's Ok if some other thread will actually consume the item (TBB scheduler case), or selective notification is precise and other threads are unable to consume the item (the above queue example). Informally, 'precise=false' allows spurious wake-ups, good for TBB scheduler because can increase reactivity, bad for queue because spurious wakeups will just add overheads. 'precise=true' does not allows spurious wake-ups, thread wakes up IFF it is selected by predicate.

Along with correct synchronization on in_waitset_ variable (described above), precise notifications allows us to rewrite queue example in the following way:

    void enqueue(void* data)
    {
        int idx = ++producer_idx_; // atomic
        buffer_[idx] = data;

        struct local
        {
            int         idx_;
            bool operator () (void* ctx, size_t count, size_t idx)
            {
                return idx_ == *(int*)ctx;
            }
        }
        pred = {idx};
        ec_.notify(pred, true); // <--- precise predicate/notification
    }

    void* dequeue()
    {
        int idx = ++consumer_idx_; // atomic
        void* data = buffer_[idx];
        if (data)
            return data;
        //for (;;) // <--------- no more loops
        //{
            ec_.prepare_wait(&idx);
            data = buffer_[idx];
            if (data)
            {
                ec_.retire_wait();
                return data;
            }
            ec_.wait();
            data = buffer_[idx];
            assert(data); // <--------- item MUST BE there
            return data;
            //if (data)
            //{
            //    return data;
            //}
        //}
    }

So, spurious wake-ups are totally eliminated. So no more loops. As opposed to condition variables, which just forces to use loops.

What do you think?

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net
Arch D. Robison (Intel)'s picture

It's an intriguing variation. Are there measurable performance differences for the queue example?

The only hazard I see is portability of the interface. Precise wakeups might be difficult to implement in alternative implementations.For example,perhaps Eventcount might be implemented quite differently on a thousand thread system. But then whatever is using Eventcount might be implemented quite differently too.

Dmitry Vyukov's picture

Quoting - Arch Robison (Intel)

It's an intriguing variation. Are there measurable performance differences for the queue example?

The only hazard I see is portability of the interface. Precise wakeups might be difficult to implement in alternative implementations.For example,perhaps Eventcount might be implemented quite differently on a thousand thread system. But then whatever is using Eventcount might be implemented quite differently too.

No, I don't have any performance data. I believe that it's possible to construct synthetic test where performance difference will be significant. But as for real usages... difficult to say... the window between prepare_wait() and commit_wait() is quite small, so probably we may remove epochs completely, and rely only on in_waitset_ so that commit_wait() will look like:

void commit_wait(ec_thread* th)
{
    if (th->in_waitset_)
        th->sema_.wait();  
    else  
        cancel_wait();  
}  

This, in turn, may hit usage in scheduler (increased latency). But, once again, what will be performance difference?

Another possibility is to remove check in commit_wait() at all:

void commit_wait(ec_thread* th)
{
    th->sema_.wait();  
}  

Anyway thread just re-checked the predicate, so what sense to check epoch_ or in_waitset_ few moments later?..

As for portability, I think that eventcount may NOT document absence of spurious failures, just says that they are reduced.

All about lock-free algorithms, multicore, scalability, parallel computing and related topics: http://www.1024cores.net

Login to leave a comment.