c++ - Adding blocking functions to lock-free queue -


i have lock-free multi producer, single consumer queue, based on circular buffer. far, has non-blocking push_back() , pop_front() calls. want add blocking versions of calls, want minimize impact has on performance of code uses non-blocking versions - namely, should not turn them "lock-by-default" calls.

e.g. simplest version of blocking push_back() this:

void push_back_blocking(const t& pkg) {     if (!push_back(pkg)) {         unique_lock<mutex> ul(mux);         while (!push_back(pkg)) {             cv_notfull.wait(ul);         }     } } 

but unfortunately require put following block @ end of "non-blocking" pop_front():

{     std::lock_guard<mutex> lg(mux);     cv_notfull.notify_all(); } 

while notify alone has hardly performance impact (if no thread waiting), lock has.

so question is:
how can (using standard c++14 if possible) add blocking push_back , pop_front member functions queue without severely impeding performance of non_blocking counterparts (read: minimize system calls)? @ least long no thread blocked - ideally then.


for reference, current version looks similar (i left out debug checks, data alignment , explicit memory orderings):

template<class t, size_t n> class mpsc_queue {     using index_type = unsigned long;     struct idx {         index_type idx;         index_type version_cnt;     };     enum class slotstate {         empty,         filled     };     struct slot {         slot() = default;                        std::atomic<slotstate> state= slotstate::empty;         t data{};     };     struct buffer_t {         std::array<slot, n> data{};          buffer_t() {             data.fill(slot{ slotstate::empty, t{} });         }         slot& operator[](idx idx) {             return this->operator[](idx.idx);         }         slot& operator[](index_type idx) {             return data[idx];                            }     };      buffer_t buffer;     std::atomic<idx> head{};     std::atomic<index_type> tail=0;      index_type next(index_type old) { return (old + 1) % n; }      idx next(idx old) {         old.idx = next(old.idx);         old.version_cnt++;         return old;     } public:          bool push_back(const t& val) {         auto thead = head.load();         idx wrtidx;         {             wrtidx = next(thead);             if (wrtidx.idx == tail) {                 return false;             }         } while (!head.compare_exchange_strong(thead, wrtidx));          buffer[wrtidx].data = val;         buffer[wrtidx].state = slotstate::filled;         return true;     }      bool pop_front(t& val) {                         auto ridx = next(tail);         if (buffer[ridx].state != slotstate::filled) {             return false;         }         val = buffer[ridx].data;         buffer[ridx].state = slotstate::empty;         tail = ridx;         return true;     } }; 

related questions:

i asked similar question specificly optimizing usage of condition_variable::notify here, question got closed supposedly duplicate of this question.
disagree, because question why mutex needed condition variables in general (or rather it's pthread equivalent) - focusing on condition_variable::wait - , not if/how can avoided notify part. apparently didn't make sufficiently clear (or people disagreed opinion).

in case, answers in linked question did not me , of xy-problem anyway, decided ask question actual problem have , allow wider range of possible solutions (maybe there way avoid condition variables altogether).

this question similar,

  1. it c on linux , answers use platform specific constructs (pthreads , futexes)
  2. the author there asked efficent blocking calls, no non-blocking ones @ all. on other hand don't care efficiency of blocking ones want keep non-blocking ones fast possible.

if there potential waiter on condition variable, have lock mutex notify_all call.

the thing condition check (!push_back(pkg)) performed before wait on condition variable (c++11 provides no other way). mutex mean can garantee constistency between these actions.

but possible omit locking (and notification) in case when no potential waiter involved. use additinal flag:

class mpsc_queue {     ... // original definitions     std::atomic<bool> has_waiters;  public:     void push_back_blocking(const t& pkg) {         if (!push_back(pkg)) {             unique_lock<mutex> ul(mux);             has_waiters.store(true, std::memory_order_relaxed); // #1             while (!push_back(pkg)) { // #2 inside push_back() method                 cv_notfull.wait(ul);                 // other waiter may clean flag while wait. set again. same #1.                 has_waiters.store(true, std::memory_order_relaxed);             }             has_waiters.store(false, std::memory_order_relaxed);         }     }      // method same original, exposed #2 mark.     bool push_back(const t& val) {         auto thead = head.load();         idx wrtidx;         {             wrtidx = next(thead);             if (wrtidx.idx == tail) { // #2                 return false;             }         } while (!head.compare_exchange_strong(thead, wrtidx));          buffer[wrtidx].data = val;         buffer[wrtidx].state = slotstate::filled;         return true;     }      bool pop_front(t& val) {         // main work, same original pop_front, exposed #3 mark.         auto ridx = next(tail);         if (buffer[ridx].state != slotstate::filled) {             return false;         }         val = buffer[ridx].data;         buffer[ridx].state = slotstate::empty;         tail = ridx; // #3          // notification part         if(has_waiters.load(std::memory_order_relaxed)) // #4         {             // there potential waiters. need lock.             std::lock_guard<mutex> lg(mux);             cv_notfull.notify_all();         }          return true;     } }; 

key relations here are:

  1. setting flag @ #1 , reading tail check condition @ #2.
  2. storing tail @ #3 , checking flag @ #4.

both these relations should expose sort of universal order. #1 should observered before #2 other thread. same #3 , #4.

in case 1 can garantee that, if checking flag #4 found not set, possible futher condition check #2 found effect of condition change #3. safe not lock (and notify), because no waiter possible.

in current implementation universal order between #1 , #2 provided loading tail implicit memory_order_seq_cst. same order between #3 , #4 provided storing tail implicit memory_order_seq_cst.

in approach, "do not lock if no waiters", universal order tricky part. in both relations, read after write order, cannot achieved combination of memory_order_acquire , memory_order_release. memory_order_seq_cst should used.


Comments

Popular posts from this blog

java - Date formats difference between yyyy-MM-dd'T'HH:mm:ss and yyyy-MM-dd'T'HH:mm:ssXXX -

c# - Get rid of xmlns attribute when adding node to existing xml -