c++ - Adding blocking functions to lock-free queue -
i have lock-free multi producer, single consumer queue, based on circular buffer. far, has non-blocking push_back()
, pop_front()
calls. want add blocking versions of calls, want minimize impact has on performance of code uses non-blocking versions - namely, should not turn them "lock-by-default" calls.
e.g. simplest version of blocking push_back() this:
void push_back_blocking(const t& pkg) { if (!push_back(pkg)) { unique_lock<mutex> ul(mux); while (!push_back(pkg)) { cv_notfull.wait(ul); } } }
but unfortunately require put following block @ end of "non-blocking" pop_front()
:
{ std::lock_guard<mutex> lg(mux); cv_notfull.notify_all(); }
while notify
alone has hardly performance impact (if no thread waiting), lock has.
so question is:
how can (using standard c++14 if possible) add blocking push_back
, pop_front
member functions queue without severely impeding performance of non_blocking counterparts (read: minimize system calls)? @ least long no thread blocked - ideally then.
for reference, current version looks similar (i left out debug checks, data alignment , explicit memory orderings):
template<class t, size_t n> class mpsc_queue { using index_type = unsigned long; struct idx { index_type idx; index_type version_cnt; }; enum class slotstate { empty, filled }; struct slot { slot() = default; std::atomic<slotstate> state= slotstate::empty; t data{}; }; struct buffer_t { std::array<slot, n> data{}; buffer_t() { data.fill(slot{ slotstate::empty, t{} }); } slot& operator[](idx idx) { return this->operator[](idx.idx); } slot& operator[](index_type idx) { return data[idx]; } }; buffer_t buffer; std::atomic<idx> head{}; std::atomic<index_type> tail=0; index_type next(index_type old) { return (old + 1) % n; } idx next(idx old) { old.idx = next(old.idx); old.version_cnt++; return old; } public: bool push_back(const t& val) { auto thead = head.load(); idx wrtidx; { wrtidx = next(thead); if (wrtidx.idx == tail) { return false; } } while (!head.compare_exchange_strong(thead, wrtidx)); buffer[wrtidx].data = val; buffer[wrtidx].state = slotstate::filled; return true; } bool pop_front(t& val) { auto ridx = next(tail); if (buffer[ridx].state != slotstate::filled) { return false; } val = buffer[ridx].data; buffer[ridx].state = slotstate::empty; tail = ridx; return true; } };
related questions:
i asked similar question specificly optimizing usage of condition_variable::notify
here, question got closed supposedly duplicate of this question.
disagree, because question why mutex needed condition variables in general (or rather it's pthread equivalent) - focusing on condition_variable::wait
- , not if/how can avoided notify
part. apparently didn't make sufficiently clear (or people disagreed opinion).
in case, answers in linked question did not me , of xy-problem anyway, decided ask question actual problem have , allow wider range of possible solutions (maybe there way avoid condition variables altogether).
this question similar,
- it c on linux , answers use platform specific constructs (pthreads , futexes)
- the author there asked efficent blocking calls, no non-blocking ones @ all. on other hand don't care efficiency of blocking ones want keep non-blocking ones fast possible.
if there potential waiter on condition variable, have lock mutex notify_all
call.
the thing condition check (!push_back(pkg)
) performed before wait on condition variable (c++11 provides no other way). mutex mean can garantee constistency between these actions.
but possible omit locking (and notification) in case when no potential waiter involved. use additinal flag:
class mpsc_queue { ... // original definitions std::atomic<bool> has_waiters; public: void push_back_blocking(const t& pkg) { if (!push_back(pkg)) { unique_lock<mutex> ul(mux); has_waiters.store(true, std::memory_order_relaxed); // #1 while (!push_back(pkg)) { // #2 inside push_back() method cv_notfull.wait(ul); // other waiter may clean flag while wait. set again. same #1. has_waiters.store(true, std::memory_order_relaxed); } has_waiters.store(false, std::memory_order_relaxed); } } // method same original, exposed #2 mark. bool push_back(const t& val) { auto thead = head.load(); idx wrtidx; { wrtidx = next(thead); if (wrtidx.idx == tail) { // #2 return false; } } while (!head.compare_exchange_strong(thead, wrtidx)); buffer[wrtidx].data = val; buffer[wrtidx].state = slotstate::filled; return true; } bool pop_front(t& val) { // main work, same original pop_front, exposed #3 mark. auto ridx = next(tail); if (buffer[ridx].state != slotstate::filled) { return false; } val = buffer[ridx].data; buffer[ridx].state = slotstate::empty; tail = ridx; // #3 // notification part if(has_waiters.load(std::memory_order_relaxed)) // #4 { // there potential waiters. need lock. std::lock_guard<mutex> lg(mux); cv_notfull.notify_all(); } return true; } };
key relations here are:
- setting flag @
#1
, readingtail
check condition @#2
. - storing
tail
@#3
, checking flag @#4
.
both these relations should expose sort of universal order. #1
should observered before #2
other thread. same #3
, #4
.
in case 1 can garantee that, if checking flag #4
found not set, possible futher condition check #2
found effect of condition change #3
. safe not lock (and notify), because no waiter possible.
in current implementation universal order between #1
, #2
provided loading tail
implicit memory_order_seq_cst. same order between #3
, #4
provided storing tail
implicit memory_order_seq_cst.
in approach, "do not lock if no waiters", universal order tricky part. in both relations, read after write order, cannot achieved combination of memory_order_acquire , memory_order_release. memory_order_seq_cst should used.
Comments
Post a Comment