libfoedus-core
FOEDUS Core Library
xct_mcs_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <atomic>
23 
24 #include "foedus/assert_nd.hpp"
27 #include "foedus/thread/thread_pimpl.hpp" // just for explicit instantiation at the end
28 #include "foedus/xct/xct_id.hpp"
30 
31 namespace foedus {
32 namespace xct {
33 
34 inline void assert_mcs_aligned(const void* address) {
35  ASSERT_ND(address);
36  ASSERT_ND(reinterpret_cast<uintptr_t>(address) % 8 == 0);
37 }
38 
39 // will be removed soon. should directly call assorted::spin_until
40 template <typename COND>
41 void spin_until(COND spin_until_cond) {
42  assorted::spin_until(spin_until_cond);
43 }
44 
52 template <typename ADAPTOR>
54  // Basically _all_ writes in this function must come with some memory barrier. Be careful!
55  // Also, the performance of this method really matters, especially that of common path.
56  // Check objdump -d. Everything in common path should be inlined.
57  // Also, check minimal sufficient mfences (note, xchg implies lock prefix. not a compiler's bug!).
58  std::atomic<bool>* me_waiting = adaptor_.me_waiting();
59  ASSERT_ND(!me_waiting->load());
60  assert_mcs_aligned(mcs_lock);
61  // so far we allow only 2^16 MCS blocks per transaction. we might increase later.
62  ASSERT_ND(adaptor_.get_cur_block() < 0xFFFFU);
63  McsBlockIndex block_index = adaptor_.issue_new_block();
64  ASSERT_ND(block_index > 0);
65  ASSERT_ND(block_index <= 0xFFFFU);
66  McsWwBlock* my_block = adaptor_.get_ww_my_block(block_index);
67  my_block->clear_successor_release();
68  me_waiting->store(true, std::memory_order_release);
69  const thread::ThreadId id = adaptor_.get_my_id();
70  McsWwBlockData desired(id, block_index); // purely local copy. okay to be always relaxed.
71  McsWwBlockData group_tail = desired; // purely local copy. okay to be always relaxed.
72  auto* address = &(mcs_lock->tail_); // be careful on this one!
73  assert_mcs_aligned(address);
74 
75  McsWwBlockData pred; // purely local copy. okay to be always relaxed.
76  ASSERT_ND(!pred.is_valid_relaxed());
77  while (true) {
78  // if it's obviously locked by a guest, we should wait until it's released.
79  // so far this is busy-wait, we can do sth. to prevent priority inversion later.
80  if (UNLIKELY(address->is_guest_relaxed())) {
81  spin_until([address]{ return !address->is_guest_acquire(); });
82  }
83 
84  // atomic op should imply full barrier, but make sure announcing the initialized new block.
85  ASSERT_ND(!group_tail.is_guest_relaxed());
86  ASSERT_ND(group_tail.is_valid_relaxed());
87  ASSERT_ND(address->get_word_atomic() != group_tail.word_);
88  pred.word_ = assorted::raw_atomic_exchange<uint64_t>(&address->word_, group_tail.word_);
89  ASSERT_ND(pred != group_tail);
90  ASSERT_ND(pred != desired);
91 
92  if (!pred.is_valid_relaxed()) {
93  // this means it was not locked.
94  ASSERT_ND(mcs_lock->is_locked());
95  DVLOG(2) << "Okay, got a lock uncontended. me=" << id;
96  me_waiting->store(false, std::memory_order_release);
97  ASSERT_ND(address->is_valid_atomic());
98  return block_index;
99  } else if (UNLIKELY(pred.is_guest_relaxed())) {
100  // ouch, I don't want to keep the guest ID! return it back.
101  // This also determines the group_tail of this queue
102  group_tail.word_ = assorted::raw_atomic_exchange<uint64_t>(&address->word_, kMcsGuestId);
103  ASSERT_ND(group_tail.is_valid_relaxed() && !group_tail.is_guest_relaxed());
104  continue;
105  } else {
106  break;
107  }
108  }
109 
110  ASSERT_ND(pred.is_valid_relaxed() && !pred.is_guest_relaxed());
111  ASSERT_ND(address->is_valid_atomic());
112  ASSERT_ND(!address->is_guest_atomic());
113  ASSERT_ND(mcs_lock->is_locked());
114  thread::ThreadId predecessor_id = pred.get_thread_id_relaxed();
115  ASSERT_ND(predecessor_id != id);
116  McsBlockIndex predecessor_block = pred.get_block_relaxed();
117  DVLOG(0) << "mm, contended, we have to wait.. me=" << id << " pred=" << predecessor_id;
118 
119  ASSERT_ND(me_waiting->load());
120  McsWwBlock* pred_block = adaptor_.get_ww_other_block(predecessor_id, predecessor_block);
121  ASSERT_ND(!pred_block->has_successor_atomic());
122 
123  pred_block->set_successor_release(id, block_index);
124 
125  ASSERT_ND(address->is_valid_atomic());
126  ASSERT_ND(!address->is_guest_atomic());
127  spin_until([me_waiting]{ return !me_waiting->load(std::memory_order_acquire); });
128  DVLOG(1) << "Okay, now I hold the lock. me=" << id << ", ex-pred=" << predecessor_id;
129  ASSERT_ND(!me_waiting->load());
130  ASSERT_ND(mcs_lock->is_locked());
131  ASSERT_ND(address->is_valid_atomic());
132  ASSERT_ND(!address->is_guest_atomic());
133  return block_index;
134 }
135 
136 template <typename ADAPTOR>
138  assert_mcs_aligned(mcs_lock);
139  // In this function, we don't even need to modify me_waiting because
140  // we are not waiting whether we fail or succeed.
141  ASSERT_ND(!adaptor_.me_waiting()->load());
142  ASSERT_ND(adaptor_.get_cur_block() < 0xFFFFU);
143  McsBlockIndex block_index = adaptor_.issue_new_block();
144  ASSERT_ND(block_index > 0);
145  ASSERT_ND(block_index <= 0xFFFFU);
146  McsWwBlock* my_block = adaptor_.get_ww_my_block(block_index);
147  my_block->clear_successor_release();
148  const thread::ThreadId id = adaptor_.get_my_id();
149  McsWwBlockData desired(id, block_index); // purely local copy. okay to be always relaxed.
150  auto* address = &(mcs_lock->tail_); // be careful on this one!
151  assert_mcs_aligned(address);
152 
153  McsWwBlockData pred; // purely local copy. okay to be always relaxed.
154  ASSERT_ND(!pred.is_valid_relaxed());
155  // atomic op should imply full barrier, but make sure announcing the initialized new block.
156  ASSERT_ND(address->get_word_atomic() != desired.word_);
157  bool swapped = assorted::raw_atomic_compare_exchange_weak<uint64_t>(
158  &address->word_,
159  &pred.word_,
160  desired.word_);
161 
162  if (swapped) {
163  // this means it was not locked.
164  ASSERT_ND(mcs_lock->is_locked());
165  DVLOG(2) << "Okay, got a lock uncontended. me=" << id;
166  ASSERT_ND(address->is_valid_atomic());
167  ASSERT_ND(!address->is_guest_atomic());
168  ASSERT_ND(!pred.is_valid_relaxed());
169  ASSERT_ND(!adaptor_.me_waiting()->load());
170  return block_index; // we got it!
171  }
172 
173  // We couldn't get the lock. As we didn't even install the queue in this case,
174  // we don't need anything for cleanup either. Let's just do sanity check on pred
175 #ifndef NDEBUG
176  ASSERT_ND(pred.is_valid_relaxed());
177  ASSERT_ND(!adaptor_.me_waiting()->load());
178  if (!pred.is_guest_relaxed()) {
179  thread::ThreadId predecessor_id = pred.get_thread_id_relaxed();
180  ASSERT_ND(predecessor_id != id);
181  }
182 #endif // NDEBUG
183 
184  // In this case, we are 100% sure no one else observed the block, so let's decrement
185  // the counter to reuse the block index. Otherwise we'll quickly reach 2^16.
186  adaptor_.cancel_new_block(block_index);
187  return 0;
188 }
189 
190 template <typename ADAPTOR>
192  // Basically _all_ writes in this function must come with release barrier.
193  // This method itself doesn't need barriers, but then we need to later take a seq_cst barrier
194  // in an appropriate place. That's hard to debug, so just take release barriers here.
195  // Also, everything should be inlined.
196  assert_mcs_aligned(mcs_lock);
197  ASSERT_ND(!adaptor_.me_waiting()->load());
198  ASSERT_ND(!mcs_lock->is_locked());
199  // so far we allow only 2^16 MCS blocks per transaction. we might increase later.
200  ASSERT_ND(adaptor_.get_cur_block() < 0xFFFFU);
201 
202  McsBlockIndex block_index = adaptor_.issue_new_block();
203  ASSERT_ND(block_index > 0 && block_index <= 0xFFFFU);
204  McsWwBlock* my_block = adaptor_.get_ww_my_block(block_index);
205  my_block->clear_successor_release();
206  const thread::ThreadId id = adaptor_.get_my_id();
207  mcs_lock->reset_release(id, block_index);
208  return block_index;
209 }
210 
211 template <typename ADAPTOR>
213  // Basically _all_ writes in this function must come with some memory barrier. Be careful!
214  // Also, the performance of this method really matters, especially that of common path.
215  // Check objdump -d. Everything in common path should be inlined.
216  // Also, check minimal sufficient lock/mfences.
217  assert_mcs_aligned(mcs_lock);
218  ASSERT_ND(!adaptor_.me_waiting()->load());
219  ASSERT_ND(mcs_lock->is_locked());
220  ASSERT_ND(block_index > 0);
221  ASSERT_ND(adaptor_.get_cur_block() >= block_index);
222  const thread::ThreadId id = adaptor_.get_my_id();
223  const McsWwBlockData myself(id, block_index); // purely local copy. okay to be always relaxed.
224  auto* address = &(mcs_lock->tail_); // be careful on this one!
225  McsWwBlock* block = adaptor_.get_ww_my_block(block_index);
226  if (!block->has_successor_acquire()) {
227  // okay, successor "seems" nullptr (not contended), but we have to make it sure with atomic CAS
228  McsWwBlockData expected = myself; // purely local copy. okay to be always relaxed.
229  assert_mcs_aligned(address);
230  bool swapped
231  = assorted::raw_atomic_compare_exchange_strong<uint64_t>(
232  &address->word_,
233  &expected.word_,
234  0);
235  if (swapped) {
236  // we have just unset the locked flag, but someone else might have just acquired it,
237  // so we can't put assertion here.
238  ASSERT_ND(id == 0 || mcs_lock->get_tail_waiter() != id);
239  ASSERT_ND(expected == myself);
240  ASSERT_ND(address->copy_atomic() != myself);
241  DVLOG(2) << "Okay, release a lock uncontended. me=" << id;
242  return;
243  }
244  ASSERT_ND(expected.is_valid_relaxed());
245  ASSERT_ND(!expected.is_guest_relaxed());
246  DVLOG(0) << "Interesting contention on MCS release. I thought it's null, but someone has just "
247  " jumped in. me=" << id << ", mcs_lock=" << *mcs_lock;
248  // wait for someone else to set the successor
249  ASSERT_ND(mcs_lock->is_locked());
250  if (UNLIKELY(!block->has_successor_acquire())) {
251  spin_until([block]{ return block->has_successor_acquire(); });
252  }
253  }
254  // Relax: In either case above, we confirmed that block->has_successor with fences.
255  // We thus can just read in relaxed mode here.
256  thread::ThreadId successor_id = block->get_successor_thread_id_relaxed();
257  DVLOG(1) << "Okay, I have a successor. me=" << id << ", succ=" << successor_id;
258  ASSERT_ND(successor_id != id);
259  ASSERT_ND(address->copy_atomic() != myself);
260 
261  ASSERT_ND(adaptor_.other_waiting(successor_id)->load());
262  ASSERT_ND(mcs_lock->is_locked());
263 
264  ASSERT_ND(address->copy_atomic() != myself);
265  adaptor_.other_waiting(successor_id)->store(false, std::memory_order_release);
266  ASSERT_ND(address->copy_atomic() != myself);
267 }
268 
273  // Basically _all_ writes in this function must come with some memory barrier. Be careful!
274  // Also, the performance of this method really matters, especially that of common path.
275  // Check objdump -d. Everything in common path should be inlined.
276  // Also, check minimal sufficient mfences (note, xchg implies lock prefix. not a compiler's bug!).
277  assert_mcs_aligned(mcs_lock);
278  auto* int_address = &(mcs_lock->tail_.word_);
279  assert_mcs_aligned(int_address);
280  spin_until([mcs_lock, int_address]{
281  McsWwBlockData old;
282  ASSERT_ND(!old.is_valid_relaxed());
283  return assorted::raw_atomic_compare_exchange_weak<uint64_t>(
284  int_address,
285  &old.word_,
286  kMcsGuestId);
287  });
288  DVLOG(1) << "Okay, now I hold the lock. me=guest";
289  ASSERT_ND(mcs_lock->is_locked());
290 }
291 
293  // Similar to acquire_try()
294  assert_mcs_aligned(mcs_lock);
295  auto* int_address = &(mcs_lock->tail_.word_);
296  assert_mcs_aligned(int_address);
297  McsWwBlockData pred; // purely local copy. okay to be always relaxed.
298  ASSERT_ND(!pred.is_valid_relaxed());
299  bool swapped = assorted::raw_atomic_compare_exchange_weak<uint64_t>(
300  &(mcs_lock->tail_.word_),
301  &pred.word_,
302  kMcsGuestId);
303 
304  if (swapped) {
305  ASSERT_ND(mcs_lock->is_locked());
306  DVLOG(2) << "Okay, got a guest lock uncontended.";
307  // We can't test is_guest here because we might now have a successor swapping it.
308  ASSERT_ND(!pred.is_valid_relaxed());
309  return true;
310  } else {
311  return true;
312  }
313 }
314 
316  assert_mcs_aligned(mcs_lock);
317  ASSERT_ND(!mcs_lock->is_locked());
318  mcs_lock->reset_guest_id_release();
319 }
320 
322  // Basically _all_ writes in this function must come with some memory barrier. Be careful!
323  // Also, the performance of this method really matters, especially that of common path.
324  // Check objdump -d. Everything in common path should be inlined.
325  // Also, check minimal sufficient mfences (note, xchg implies lock prefix. not a compiler's bug!).
326  assert_mcs_aligned(mcs_lock);
327  auto* int_address = &(mcs_lock->tail_.word_);
328  assert_mcs_aligned(int_address);
329  ASSERT_ND(mcs_lock->is_locked());
330  spin_until([int_address]{
334  return assorted::raw_atomic_compare_exchange_weak<uint64_t>(int_address, &old.word_, 0);
335  });
336  DVLOG(1) << "Okay, guest released the lock.";
337 }
338 
347 template <typename ADAPTOR>
348 class McsImpl<ADAPTOR, McsRwSimpleBlock> { // partial specialization for McsRwSimpleBlock
349  public:
351  McsBlockIndex block_index = adaptor_.issue_new_block();
352  bool success = retry_async_rw_writer(lock, block_index);
353  if (success) {
354  return block_index;
355  } else {
356  // The block is never observed. reuse
357  adaptor_.cancel_new_block(block_index);
358  return 0;
359  }
360  }
361 
362  static bool does_support_try_rw_reader() { return false; }
364  McsBlockIndex block_index = adaptor_.issue_new_block();
365  bool success = retry_async_rw_reader(lock, block_index);
366  if (success) {
367 #ifndef NDEBUG
368  auto* my_block = adaptor_.get_rw_my_block(block_index);
369  ASSERT_ND(my_block->is_finalized());
370  ASSERT_ND(my_block->is_granted());
371 #endif // NDEBUG
372  return block_index;
373  } else {
374  // The block is never observed. reuse
375  adaptor_.cancel_new_block(block_index);
376  return 0;
377  }
378  }
379 
381  ASSERT_ND(adaptor_.get_cur_block() < 0xFFFFU);
382  const thread::ThreadId id = adaptor_.get_my_id();
383  const McsBlockIndex block_index = adaptor_.issue_new_block();
384  ASSERT_ND(block_index > 0);
385  ASSERT_ND(sizeof(McsRwSimpleBlock) == sizeof(McsWwBlock));
386  auto* my_block = adaptor_.get_rw_my_block(block_index);
387 
388  // So I'm a reader
389  my_block->init_reader();
390  ASSERT_ND(my_block->is_blocked() && my_block->is_reader());
391  ASSERT_ND(!my_block->has_successor());
392  ASSERT_ND(my_block->successor_block_index_ == 0);
393 
394  // Now ready to XCHG
395  uint32_t tail_desired = McsRwLock::to_tail_int(id, block_index);
396  uint32_t* tail_address = &(mcs_rw_lock->tail_);
397  uint32_t pred_tail_int = assorted::raw_atomic_exchange<uint32_t>(tail_address, tail_desired);
398 
399  if (pred_tail_int == 0) {
400  mcs_rw_lock->increment_nreaders();
401  my_block->unblock(); // reader successors will know they don't need to wait
402  } else {
403  // See if the predecessor is a reader; if so, if it already acquired the lock.
404  auto* pred_block = adaptor_.dereference_rw_tail_block(pred_tail_int);
405  uint16_t* pred_state_address = &pred_block->self_.data_;
406  uint16_t pred_state_expected = pred_block->make_blocked_with_no_successor_state();
407  uint16_t pred_state_desired = pred_block->make_blocked_with_reader_successor_state();
408  if (!pred_block->is_reader() || assorted::raw_atomic_compare_exchange_strong<uint16_t>(
409  pred_state_address,
410  &pred_state_expected,
411  pred_state_desired)) {
412  // Predecessor is a writer or a waiting reader. The successor class field and the
413  // blocked state in pred_block are separated, so we can blindly set_successor().
414  pred_block->set_successor_next_only(id, block_index);
415  spin_until([my_block]{ return my_block->is_granted(); });
416  } else {
417  // Join the active, reader predecessor
418  ASSERT_ND(!pred_block->is_blocked());
419  mcs_rw_lock->increment_nreaders();
420  pred_block->set_successor_next_only(id, block_index);
421  my_block->unblock();
422  }
423  }
424  finalize_acquire_reader_simple(mcs_rw_lock, my_block);
425  ASSERT_ND(my_block->is_finalized());
426  return block_index;
427  }
428 
430  McsRwLock* mcs_rw_lock,
431  McsBlockIndex block_index) {
432  const thread::ThreadId id = adaptor_.get_my_id();
433  ASSERT_ND(block_index > 0);
434  ASSERT_ND(adaptor_.get_cur_block() >= block_index);
435  McsRwSimpleBlock* my_block = adaptor_.get_rw_my_block(block_index);
436  ASSERT_ND(my_block->is_finalized());
437  // Make sure there is really no successor or wait for it
438  uint32_t* tail_address = &mcs_rw_lock->tail_;
439  uint32_t expected = McsRwLock::to_tail_int(id, block_index);
440  if (my_block->successor_is_ready() ||
441  !assorted::raw_atomic_compare_exchange_strong<uint32_t>(tail_address, &expected, 0)) {
442  // Have to wait for the successor to install itself after me
443  // Don't check for curr_block->has_successor()! It only tells whether the state bit
444  // is set, not whether successor_thread_id_ and successor_block_index_ are set.
445  // But remember to skip trying readers who failed.
446  spin_until([my_block]{ return my_block->successor_is_ready(); });
447  if (my_block->has_writer_successor()) {
448  assorted::raw_atomic_exchange<thread::ThreadId>(
449  &mcs_rw_lock->next_writer_,
450  my_block->successor_thread_id_);
451  }
452  }
453 
454  if (mcs_rw_lock->decrement_nreaders() == 1) {
455  // I'm the last active reader
456  thread::ThreadId next_writer
457  = assorted::atomic_load_acquire<thread::ThreadId>(&mcs_rw_lock->next_writer_);
458  if (next_writer != McsRwLock::kNextWriterNone &&
459  mcs_rw_lock->nreaders() == 0 &&
460  assorted::raw_atomic_compare_exchange_strong<thread::ThreadId>(
461  &mcs_rw_lock->next_writer_,
462  &next_writer,
464  // I have a waiting writer, wake it up
465  // Assuming a thread can wait for one and only one MCS lock at any instant
466  // before starting to acquire the next.
467  McsBlockIndex next_cur_block = adaptor_.get_other_cur_block(next_writer);
468  McsRwSimpleBlock *writer_block = adaptor_.get_rw_other_block(next_writer, next_cur_block);
469  ASSERT_ND(writer_block->is_blocked());
470  ASSERT_ND(!writer_block->is_reader());
471  writer_block->unblock();
472  }
473  }
474  }
475 
477  McsRwLock* mcs_rw_lock) {
478  const thread::ThreadId id = adaptor_.get_my_id();
479  const McsBlockIndex block_index = adaptor_.issue_new_block();
480  ASSERT_ND(adaptor_.get_cur_block() < 0xFFFFU);
481  ASSERT_ND(block_index > 0);
482  ASSERT_ND(sizeof(McsRwSimpleBlock) == sizeof(McsWwBlock));
483  auto* my_block = adaptor_.get_rw_my_block(block_index);
484 
485  my_block->init_writer();
486  ASSERT_ND(my_block->is_blocked() && !my_block->is_reader());
487  ASSERT_ND(!my_block->has_successor());
488  ASSERT_ND(my_block->successor_block_index_ == 0);
489 
490  // Now ready to XCHG
491  uint32_t tail_desired = McsRwLock::to_tail_int(id, block_index);
492  uint32_t* tail_address = &(mcs_rw_lock->tail_);
493  uint32_t pred_tail_int = assorted::raw_atomic_exchange<uint32_t>(tail_address, tail_desired);
494  ASSERT_ND(pred_tail_int != tail_desired);
495  thread::ThreadId old_next_writer = 0xFFFFU;
496  if (pred_tail_int == 0) {
498  assorted::raw_atomic_exchange<thread::ThreadId>(&mcs_rw_lock->next_writer_, id);
499  if (mcs_rw_lock->nreaders() == 0) {
500  old_next_writer = assorted::raw_atomic_exchange<thread::ThreadId>(
501  &mcs_rw_lock->next_writer_,
503  if (old_next_writer == id) {
504  my_block->unblock();
505  return block_index;
506  }
507  }
508  } else {
509  auto* pred_block = adaptor_.dereference_rw_tail_block(pred_tail_int);
510  pred_block->set_successor_class_writer();
511  pred_block->set_successor_next_only(id, block_index);
512  }
513  spin_until([my_block]{ return my_block->is_granted(); });
514  return block_index;
515  }
516 
518  McsRwLock* mcs_rw_lock,
519  McsBlockIndex block_index) {
520  const thread::ThreadId id = adaptor_.get_my_id();
521  ASSERT_ND(block_index > 0);
522  ASSERT_ND(adaptor_.get_cur_block() >= block_index);
523  auto* my_block = adaptor_.get_rw_my_block(block_index);
524  uint32_t expected = McsRwLock::to_tail_int(id, block_index);
525  uint32_t* tail_address = &mcs_rw_lock->tail_;
526  if (my_block->successor_is_ready() ||
527  !assorted::raw_atomic_compare_exchange_strong<uint32_t>(tail_address, &expected, 0)) {
528  if (UNLIKELY(!my_block->successor_is_ready())) {
529  spin_until([my_block]{ return my_block->successor_is_ready(); });
530  }
531  ASSERT_ND(my_block->successor_is_ready());
532  auto* successor_block = adaptor_.get_rw_other_block(
533  my_block->successor_thread_id_,
534  my_block->successor_block_index_);
535  ASSERT_ND(successor_block->is_blocked());
536  if (successor_block->is_reader()) {
537  mcs_rw_lock->increment_nreaders();
538  }
539  successor_block->unblock();
540  }
541  }
542 
543 
545  // In simple version, no distinction between try/async/retry. Same logic.
546  McsBlockIndex block_index = adaptor_.issue_new_block();
547  bool success = retry_async_rw_reader(lock, block_index);
548  return {success, block_index};
549  }
551  McsBlockIndex block_index = adaptor_.issue_new_block();
552  bool success = retry_async_rw_writer(lock, block_index);
553  return {success, block_index};
554  }
555 
557  McsRwLock* lock,
558  McsBlockIndex block_index) {
559  /* The following turns out to be unsafe because there might be writers
560  who are not yet detected by the pred-readers. We are still trying
561  to find a way without resorting to extended version.. but for now disabled.
562  does_support_try_rw_reader() returns false for this reason.
563  const thread::ThreadId id = adaptor_.get_my_id();
564  // take a look at the whole lock word, and cas if it's a reader or null
565  uint64_t lock_word
566  = assorted::atomic_load_acquire<uint64_t>(reinterpret_cast<uint64_t*>(lock));
567  McsRwLock ll;
568  std::memcpy(&ll, &lock_word, sizeof(ll));
569  // Note: it's tempting to put this whole function under an infinite retry
570  // loop and only break when this condition is true. That works fine with
571  // a single lock, but might cause deadlocks and making this try version
572  // not really a try, consider this example with two locks A and B.
573  //
574  // Lock: requester 1 -> requester 2
575  //
576  // A: T1 holding as writer -> T2 waiting unconditionally as a writer in canonical mode
577  // B: T2 holding as writer -> T1 trying as a reader in non-canonical mode
578  //
579  // In this case, T1 always sees next_writer=none because T2 consumed it when it got the
580  // lock, and the below CAS fails because now B.tail is T2, a writer. T1 would stay in
581  // the loop forever...
582  if (ll.next_writer_ != McsRwLock::kNextWriterNone) {
583  return false;
584  }
585  McsRwSimpleBlock* block = nullptr;
586  if (ll.tail_) {
587  block = adaptor_.dereference_rw_tail_block(ll.tail_);
588  }
589  if (ll.tail_ == 0 || (block->is_granted() && block->is_reader())) {
590  ll.increment_nreaders();
591  ll.tail_ = McsRwLock::to_tail_int(id, block_index);
592  uint64_t desired = *reinterpret_cast<uint64_t*>(&ll);
593  auto* my_block = adaptor_.get_rw_my_block(block_index);
594  my_block->init_reader();
595 
596  if (assorted::raw_atomic_compare_exchange_weak<uint64_t>(
597  reinterpret_cast<uint64_t*>(lock), &lock_word, desired)) {
598  if (block) {
599  block->set_successor_next_only(id, block_index);
600  }
601  my_block->unblock();
602  finalize_acquire_reader_simple(lock, my_block);
603  return true;
604  }
605  }
606  */
607 
608  // Instead, a best-effort impl here, which is basically same as the writer-case below.
609  // This returns false even if there only are readers.
610  const thread::ThreadId id = adaptor_.get_my_id();
611  auto* my_block = adaptor_.get_rw_my_block(block_index);
612  my_block->init_reader();
613 
614  McsRwLock tmp;
615  uint64_t expected = *reinterpret_cast<uint64_t*>(&tmp);
616  McsRwLock tmp2;
617  tmp2.increment_nreaders();
618  tmp2.tail_ = McsRwLock::to_tail_int(id, block_index);
619  uint64_t desired = *reinterpret_cast<uint64_t*>(&tmp2);
620  if (assorted::raw_atomic_compare_exchange_weak<uint64_t>(
621  reinterpret_cast<uint64_t*>(lock), &expected, desired)) {
622  my_block->unblock();
623  finalize_acquire_reader_simple(lock, my_block);
624  return true;
625  }
626  return false;
627  }
628  bool retry_async_rw_writer(McsRwLock* lock, McsBlockIndex block_index) {
629  const thread::ThreadId id = adaptor_.get_my_id();
630  auto* my_block = adaptor_.get_rw_my_block(block_index);
631  my_block->init_writer();
632 
633  McsRwLock tmp;
634  uint64_t expected = *reinterpret_cast<uint64_t*>(&tmp);
635  McsRwLock tmp2;
636  tmp2.tail_ = McsRwLock::to_tail_int(id, block_index);
637  uint64_t desired = *reinterpret_cast<uint64_t*>(&tmp2);
638  my_block->unblock();
639  return assorted::raw_atomic_compare_exchange_weak<uint64_t>(
640  reinterpret_cast<uint64_t*>(lock), &expected, desired);
641  }
642 
643  void cancel_async_rw_reader(McsRwLock* /*lock*/, McsBlockIndex /*block_index*/) {
644  // In simple version, we don't actually have any mechanism to retry.
645  // so, we don't have to do any cancel, either. No-op.
646  }
647  void cancel_async_rw_writer(McsRwLock* /*lock*/, McsBlockIndex /*block_index*/) {
648  }
649 
650  private:
652  void finalize_acquire_reader_simple(McsRwLock* lock, McsRwSimpleBlock* my_block) {
653  ASSERT_ND(!my_block->is_finalized());
654  if (my_block->has_reader_successor()) {
655  spin_until([my_block]{ return my_block->successor_is_ready(); });
656  // Unblock the reader successor
657  McsRwSimpleBlock* successor_block = adaptor_.get_rw_other_block(
658  my_block->successor_thread_id_,
659  my_block->successor_block_index_);
660  lock->increment_nreaders();
661  successor_block->unblock();
662  }
663  my_block->set_finalized();
664  }
665 
666  ADAPTOR adaptor_;
667 }; // end of McsImpl<ADAPTOR, McsRwSimpleBlock> specialization
668 
675 template <typename ADAPTOR>
676 class McsImpl<ADAPTOR, McsRwExtendedBlock> { // partial specialization for McsRwExtendedBlock
677  public:
678  static bool does_support_try_rw_reader() { return true; }
680  McsBlockIndex block_index = 0;
681  auto ret = acquire_reader_lock(lock, &block_index, McsRwExtendedBlock::kTimeoutNever);
682  ASSERT_ND(block_index);
683  ASSERT_ND(ret == kErrorCodeOk);
684 #ifndef NDEBUG
685  auto* my_block = adaptor_.get_rw_my_block(block_index);
686  ASSERT_ND(my_block->next_flag_is_granted());
687  ASSERT_ND(my_block->pred_flag_is_granted());
688 #endif
689  return block_index;
690  }
692  McsBlockIndex block_index = 0;
693  auto ret = acquire_writer_lock(lock, &block_index, McsRwExtendedBlock::kTimeoutNever);
694  ASSERT_ND(block_index);
695  ASSERT_ND(ret == kErrorCodeOk);
696 #ifndef NDEBUG
697  auto* my_block = adaptor_.get_rw_my_block(block_index);
698  ASSERT_ND(my_block->next_flag_is_granted());
699  ASSERT_ND(my_block->pred_flag_is_granted());
700 #endif
701  return block_index;
702  }
706  const thread::ThreadId id = adaptor_.get_my_id();
707  McsBlockIndex block_index = 0;
708  auto* my_block = init_block(&block_index, true);
709 
710  McsRwLock tmp;
711  uint64_t expected = *reinterpret_cast<uint64_t*>(&tmp);
712  McsRwLock tmp2;
713  tmp2.tail_ = McsRwLock::to_tail_int(id, block_index);
714  uint64_t desired = *reinterpret_cast<uint64_t*>(&tmp2);
715  my_block->set_flags_granted();
716  if (assorted::raw_atomic_compare_exchange_weak<uint64_t>(
717  reinterpret_cast<uint64_t*>(lock), &expected, desired)) {
718  return block_index;
719  } else {
720  // The block is never observed. reuse
721  adaptor_.cancel_new_block(block_index);
722  return 0;
723  }
724 
725  /*
726  * XXX(tzwang, Feb 2016): it turns out the above CAS-try is better than using the trio -
727  * the difference could be as much as 10x under high contention (DL580). The reason
728  * I think is under high contention, often we need to cancel anyway, which is much
729  * more expensive than a simple weak CAS.
730  *
731  * But the case for readers is a bit different: the trio wins especially with a lot of
732  * reads because it allows real reader-sharing.
733  */
734  /*
735  McsBlockIndex block_index = 0;
736  //auto ret = acquire_writer_lock(lock, &block_index, McsRwExtendedBlock::kTimeoutZero);
737  auto ret = acquire_writer_lock(lock, &block_index, 1000);
738  ASSERT_ND(ret == kErrorCodeOk || ret == kErrorCodeLockRequested);
739  ASSERT_ND(block_index);
740  if (ret == kErrorCodeOk) {
741  return block_index;
742  }
743  return 0;
744 
745  ASSERT_ND(ret == kErrorCodeLockRequested);
746  uint32_t my_tail_int =
747  xct::McsRwLock::to_tail_int(static_cast<uint32_t>(adaptor_.get_my_id()), block_index);
748  // check once
749  if (retry_async_rw_writer(lock, block_index) ||
750  cancel_writer_lock(lock, my_tail_int) == kErrorCodeOk) {
751  return block_index;
752  }
753  return 0;
754  */
755  }
757  // This is a bit special, we do an async acquire with a very short timeout:
758  // giving 0 timeout might cause unnecessary cancelling because of delay in
759  // lock granting from reader predecessor. Note that there is a delay even if
760  // there are only readers, the last requester has to wait for its predecessor
761  // to notify about the granting of the lock.
762  McsBlockIndex block_index = 0;
763  auto ret = acquire_reader_lock(lock, &block_index, 10);
765  ASSERT_ND(block_index);
766  if (ret == kErrorCodeOk) {
767  return block_index;
768  } else {
769  // NOTE: In this case, we CANNOT do this. Extended version's try-reader
770  // actually inserts the qnode. So others might have observed it.
771  // // adaptor_.cancel_new_block(block_index);
772  // To avoid using up 2^16 qnodes, we might want a "peek" check to
773  // exit before putting qnode when obviously there is a writer.
774  return 0;
775  }
776 
777  /* The old version that uses 0 timeout:
778  McsBlockIndex block_index = 0;
779  auto ret = acquire_reader_lock(lock, &block_index, McsRwExtendedBlock::kTimeoutZero);
780  ASSERT_ND(ret == kErrorCodeOk || ret == kErrorCodeLockRequested);
781  ASSERT_ND(block_index);
782  if (ret == kErrorCodeOk) {
783  return block_index;
784  }
785  ASSERT_ND(ret == kErrorCodeLockRequested);
786  uint32_t my_tail_int =
787  xct::McsRwLock::to_tail_int(static_cast<uint32_t>(adaptor_.get_my_id()), block_index);
788  // check once
789  if (retry_async_rw_reader(lock, block_index) ||
790  cancel_reader_lock(lock, my_tail_int) == kErrorCodeOk) {
791  return block_index;
792  }
793  return 0;
794  */
795  }
796  void release_rw_reader(McsRwLock* lock, McsBlockIndex block_index) {
797  release_reader_lock(lock, block_index);
798  }
799  void release_rw_writer(McsRwLock* lock, McsBlockIndex block_index) {
800  release_writer_lock(lock, block_index);
801  }
805  McsBlockIndex block_index = 0;
806  auto ret = acquire_reader_lock(lock, &block_index, McsRwExtendedBlock::kTimeoutZero);
808 #ifndef NDEBUG
809  auto* my_block = adaptor_.get_rw_my_block(block_index);
810  if (ret == kErrorCodeOk) {
811  ASSERT_ND(my_block->pred_flag_is_granted());
812  ASSERT_ND(my_block->next_flag_is_granted());
813  } else {
815  ASSERT_ND(!my_block->next_flag_is_granted());
816  }
817 #endif
818  ASSERT_ND(block_index);
819  return {ret == kErrorCodeOk, block_index};
820  }
822  McsBlockIndex block_index = 0;
823  auto ret = acquire_writer_lock(lock, &block_index, McsRwExtendedBlock::kTimeoutZero);
825  ASSERT_ND(block_index);
826 #ifndef NDEBUG
827  auto* my_block = adaptor_.get_rw_my_block(block_index);
828  if (ret == kErrorCodeOk) {
829  ASSERT_ND(my_block->pred_flag_is_granted());
830  ASSERT_ND(my_block->next_flag_is_granted());
831  } else {
833  ASSERT_ND(!my_block->next_flag_is_granted());
834  }
835 #endif
836  return {ret == kErrorCodeOk, block_index};
837  }
838  bool retry_async_rw_reader(McsRwLock* lock, McsBlockIndex block_index) {
839  auto* block = adaptor_.get_rw_my_block(block_index);
840  if (block->pred_flag_is_granted()) {
841  // checking me.next.flags.granted is ok - we're racing with ourself
842  if (!block->next_flag_is_granted()) {
843  auto ret = finish_acquire_reader_lock(lock, block,
844  xct::McsRwLock::to_tail_int(static_cast<uint32_t>(adaptor_.get_my_id()), block_index));
845  ASSERT_ND(ret == kErrorCodeOk);
846  }
847  ASSERT_ND(block->next_flag_is_granted());
848  return true;
849  }
850  ASSERT_ND(!block->next_flag_is_granted());
851  return false;
852  }
853  bool retry_async_rw_writer(McsRwLock* lock, McsBlockIndex block_index) {
854  auto* block = adaptor_.get_rw_my_block(block_index);
855  if (block->pred_flag_is_granted()) {
856  // checking me.next.flags.granted is ok - we're racing with ourself
857  if (!block->next_flag_is_granted()) {
858  block->set_next_flag_granted();
859  adaptor_.remove_rw_async_mapping(lock);
860  }
861  ASSERT_ND(block->next_flag_is_granted());
862  return true;
863  }
864  ASSERT_ND(!block->next_flag_is_granted());
865  return false;
866  }
867  void cancel_async_rw_reader(McsRwLock* lock, McsBlockIndex block_index) {
868  if (!retry_async_rw_reader(lock, block_index)) {
869  uint32_t my_tail_int = McsRwLock::to_tail_int(adaptor_.get_my_id(), block_index);
870  if (cancel_reader_lock(lock, my_tail_int) == kErrorCodeOk) {
871  // actually got the lock, have to release then
872  release_reader_lock(lock, block_index);
873  }
874  } else {
875  release_reader_lock(lock, block_index);
876  }
877  }
878  void cancel_async_rw_writer(McsRwLock* lock, McsBlockIndex block_index) {
879  uint32_t my_tail_int = McsRwLock::to_tail_int(adaptor_.get_my_id(), block_index);
880  if (cancel_writer_lock(lock, my_tail_int) == kErrorCodeOk) {
881  release_writer_lock(lock, block_index);
882  }
883  }
884 
885  private:
887  McsRwExtendedBlock* init_block(xct::McsBlockIndex* out_block_index, bool writer) {
888  ASSERT_ND(out_block_index);
889  McsBlockIndex block_index = 0;
890  if (*out_block_index) {
891  // already provided, use it; caller must make sure this block is not being used
892  block_index = *out_block_index;
893  } else {
894  block_index = *out_block_index = adaptor_.issue_new_block();
895  }
896  ASSERT_ND(block_index <= 0xFFFFU);
897  ASSERT_ND(block_index > 0);
898  ASSERT_ND(adaptor_.get_cur_block() < 0xFFFFU);
899  auto* my_block = adaptor_.get_rw_my_block(block_index);
900  if (writer) {
901  my_block->init_writer();
902  } else {
903  my_block->init_reader();
904  }
905  return my_block;
906  }
907 
908  void link_pred(
909  uint32_t pred,
910  McsRwExtendedBlock* pred_block,
911  uint32_t me,
912  McsRwExtendedBlock* my_block) {
913  ASSERT_ND(my_block->get_pred_id() == 0);
914  ASSERT_ND(pred_block->get_next_id() == 0);
915  my_block->set_pred_id(pred);
916  pred_block->set_next_id(me);
917  }
918 
919  ErrorCode acquire_reader_lock(McsRwLock* lock, McsBlockIndex* out_block_index, int32_t timeout) {
920  auto* my_block = init_block(out_block_index, false);
921  ASSERT_ND(my_block->pred_flag_is_waiting());
922  ASSERT_ND(my_block->next_flag_is_waiting());
923  ASSERT_ND(!my_block->next_flag_is_busy());
924  const thread::ThreadId id = adaptor_.get_my_id();
925  auto my_tail_int = McsRwLock::to_tail_int(id, *out_block_index);
926 
927  auto pred = lock->xchg_tail(my_tail_int);
928  if (pred == 0) {
929  lock->increment_nreaders();
930  ASSERT_ND(my_block->get_pred_id() == 0);
931  my_block->set_pred_flag_granted();
932  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
933  }
934 
935  ASSERT_ND(my_block->get_pred_id() == 0);
936  // haven't set pred.next.id yet, safe to dereference
937  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
938  if (pred_block->is_reader()) {
939  return acquire_reader_lock_check_reader_pred(lock, my_block, my_tail_int, pred, timeout);
940  }
941  return acquire_reader_lock_check_writer_pred(lock, my_block, my_tail_int, pred, timeout);
942  }
943 
944  ErrorCode finish_acquire_reader_lock(
945  McsRwLock* lock, McsRwExtendedBlock* my_block, uint32_t my_tail_int) {
946  my_block->set_next_flag_busy_granted();
947  ASSERT_ND(my_block->next_flag_is_granted());
948  ASSERT_ND(my_block->next_flag_is_busy());
949  spin_until([my_block]{
950  return my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving; });
951 
952  // if the lock tail now still points to me, truly no one is there, we're done
953  if (lock->get_tail_int() == my_tail_int) {
954  my_block->unset_next_flag_busy();
955  return kErrorCodeOk;
956  }
957  // note that the successor can't cancel now, ie my next.id is stable
958  spin_until([my_block]{ return my_block->get_next_id() != 0; });
959  uint64_t next = my_block->get_next();
960  uint32_t next_id = next >> 32;
961  ASSERT_ND(next_id);
963  ASSERT_ND(my_block->next_flag_is_granted());
964  ASSERT_ND(my_block->next_flag_is_busy());
966  my_block->unset_next_flag_busy();
967  return kErrorCodeOk;
968  }
969 
970  auto* succ_block = adaptor_.dereference_rw_tail_block(next_id);
971  if (succ_block->is_writer()) {
972  my_block->unset_next_flag_busy();
973  return kErrorCodeOk;
974  }
975 
976  // successor might be cancelling, in which case it'd xchg me.next.id to NoSuccessor;
977  // it's also possible that my cancelling writer successor is about to give me a new
978  // reader successor, in this case my cancelling successor will realize that I already
979  // have the lock and try to wake up the new successor directly also by trying to change
980  // me.next.id to NoSuccessor (the new successor might spin forever if its timeout is
981  // Never and the cancelling successor didn't wake it up).
982  if (!my_block->cas_next_id_strong(next_id, McsRwExtendedBlock::kSuccIdNoSuccessor)) {
983  ASSERT_ND(my_block->get_next_id() == McsRwExtendedBlock::kSuccIdNoSuccessor);
984  my_block->unset_next_flag_busy();
985  return kErrorCodeOk;
986  }
987 
988  if (my_block->next_flag_is_leaving_granted() && !my_block->next_flag_has_successor()) {
989  // successor might have seen me in leaving state, it'll wait for me in that case
990  // in this case, the successor saw me in leaving state and didnt register as a reader
991  // ie successor was acquiring
992  ASSERT_ND(succ_block->pred_flag_is_waiting());
993  // XXX(tzwang): we were using the weak version of CAS, but it tended to lock up when
994  // while gdb tells the link between me and successor is good. Use strong version for
995  // now; there are several other similar intances, all converted to *_strong.
996  spin_until([succ_block, my_tail_int]{ return succ_block->get_pred_id() == my_tail_int; });
997  if (succ_block->cas_pred_id_strong(my_tail_int, McsRwExtendedBlock::kPredIdAcquired)) {
998  lock->increment_nreaders();
999  succ_block->set_pred_flag_granted();
1000  // make sure I know when releasing no need to wait
1001  my_block->set_next_id(McsRwExtendedBlock::kSuccIdNoSuccessor);
1002  }
1003  } else {
1004  if (my_block->next_flag_has_reader_successor()) {
1005  while (true) {
1006  spin_until([succ_block, my_tail_int]{ return succ_block->get_pred_id() == my_tail_int; });
1007  if (succ_block->cas_pred_id_strong(my_tail_int, McsRwExtendedBlock::kPredIdAcquired)) {
1008  ASSERT_ND(succ_block->pred_flag_is_waiting());
1009  lock->increment_nreaders();
1010  succ_block->set_pred_flag_granted();
1011  my_block->set_next_id(McsRwExtendedBlock::kSuccIdNoSuccessor);
1012  break;
1013  }
1014  }
1015  }
1016  }
1017  my_block->unset_next_flag_busy();
1018  ASSERT_ND(my_block->get_next_id() == McsRwExtendedBlock::kSuccIdNoSuccessor);
1019  return kErrorCodeOk;
1020  }
1021 
1022  ErrorCode acquire_reader_lock_check_reader_pred(
1023  McsRwLock* lock,
1024  McsRwExtendedBlock* my_block,
1025  uint32_t my_tail_int,
1026  uint32_t pred,
1027  int32_t timeout) {
1028  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1029  check_pred:
1030  ASSERT_ND(my_block->get_pred_id() == 0);
1031  ASSERT_ND(pred_block->is_reader());
1032  // wait for the previous canceling dude to leave
1033  spin_until([pred_block]{
1034  return !pred_block->get_next_id() && !pred_block->next_flag_has_successor(); });
1035  uint32_t expected = pred_block->make_next_flag_waiting_with_no_successor();
1036  uint32_t val = pred_block->cas_val_next_flag_weak(
1037  expected, pred_block->make_next_flag_waiting_with_reader_successor());
1038  if (val == expected) {
1039  link_pred(pred, pred_block, my_tail_int, my_block);
1040  if (my_block->timeout_granted(timeout)) {
1041  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1042  }
1043  if (timeout == McsRwExtendedBlock::kTimeoutZero) {
1044  return kErrorCodeLockRequested;
1045  }
1046  return cancel_reader_lock(lock, my_tail_int);
1047  }
1048 
1050  // don't set pred.next.successor_class here
1051  link_pred(pred, pred_block, my_tail_int, my_block);
1052  // if pred did cancel, it will give me a new pred; if it got the lock it will wake me up
1053  spin_until([my_block, pred]{ return my_block->get_pred_id() != pred; });
1054  // consume it and retry
1055  pred = my_block->xchg_pred_id(0);
1057  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1058  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1059  }
1060  ASSERT_ND(!my_block->pred_flag_is_granted());
1061  ASSERT_ND(pred);
1063  pred_block = adaptor_.dereference_rw_tail_block(pred);
1064  if (pred_block->is_writer()) {
1065  return acquire_reader_lock_check_writer_pred(lock, my_block, my_tail_int, pred, timeout);
1066  }
1067  goto check_pred;
1068  } else {
1069  // pred is granted - might be a direct grant or grant in the leaving process
1070  ASSERT_ND(
1071  (val & McsRwExtendedBlock::kSuccFlagMask) == McsRwExtendedBlock::kSuccFlagDirectGranted ||
1072  (val & McsRwExtendedBlock::kSuccFlagMask) == McsRwExtendedBlock::kSuccFlagLeavingGranted);
1073  ASSERT_ND(pred_block->is_reader());
1074  // I didn't register, pred won't wake me up, but if pred is leaving_granted,
1075  // we need to tell it not to poke me in its finish-acquire call. For direct_granted,
1076  // also set its next.id to NoSuccessor so it knows that there's no need to wait and
1077  // examine successor upon release. This also covers the case when pred.next.flags
1078  // has Busy set.
1079  pred_block->set_next_id(McsRwExtendedBlock::kSuccIdNoSuccessor);
1080  lock->increment_nreaders();
1081  my_block->set_pred_flag_granted();
1082  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1083  }
1084  ASSERT_ND(false);
1085  }
1086 
1087  ErrorCode cancel_reader_lock(McsRwLock* lock, uint32_t my_tail_int) {
1088  auto* my_block = adaptor_.dereference_rw_tail_block(my_tail_int);
1089  ASSERT_ND(!my_block->next_flag_is_granted());
1090  auto pred = my_block->xchg_pred_id(0); // prevent pred from cancelling
1092  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1093  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1094  }
1095 
1096  // make sure successor can't leave, unless it tried to leave first
1097  ASSERT_ND(!my_block->next_flag_is_granted());
1098  my_block->set_next_flag_leaving();
1099  spin_until([my_block]{
1100  return my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving; });
1101 
1102  ASSERT_ND(pred);
1103  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1104  if (pred_block->is_reader()) {
1105  return cancel_reader_lock_with_reader_pred(lock, my_block, my_tail_int, pred);
1106  }
1107  ASSERT_ND(my_block->get_pred_id() == 0);
1108  return cancel_reader_lock_with_writer_pred(lock, my_block, my_tail_int, pred);
1109  }
1110 
1111  ErrorCode cancel_reader_lock_with_writer_pred(
1112  McsRwLock* lock, McsRwExtendedBlock* my_block, uint32_t my_tail_int, uint32_t pred) {
1113  retry:
1114  ASSERT_ND(!my_block->next_flag_is_granted());
1115  ASSERT_ND(my_block->next_flag_is_leaving());
1116  ASSERT_ND(pred);
1117  ASSERT_ND(pred >> 16 != adaptor_.get_my_id());
1118  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1119  ASSERT_ND(pred_block->is_writer());
1120  ASSERT_ND(my_block->get_pred_id() == 0);
1121  // wait for the cancelling pred to finish relink
1122  spin_until([pred_block, my_tail_int]{
1123  return pred_block->get_next_id() == my_tail_int &&
1124  pred_block->next_flag_has_reader_successor(); });
1125  ASSERT_ND(pred_block->next_flag_has_reader_successor());
1126  // pred is a writer, so I can go as long as it's not also leaving (cancelling or releasing)
1127  ASSERT_ND(my_block->get_pred_id() == 0);
1128  while (true) {
1129  uint64_t eflags = pred_block->read_next_flags();
1130  if ((eflags & McsRwExtendedBlock::kSuccFlagMask) ==
1132  // must wait for pred to give me a new pred (or wait to be waken up?)
1133  // pred should give me a new pred, after its CAS trying to pass me the lock failed
1134  ASSERT_ND(my_block->get_pred_id() == 0);
1135  my_block->set_pred_id(pred);
1136  pred = my_block->xchg_pred_id(0);
1137  if (pred == 0 || pred == McsRwExtendedBlock::kPredIdAcquired) {
1138  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1139  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1140  } else {
1141  // make sure successor can't leave, unless it tried to leave first
1142  ASSERT_ND(!my_block->next_flag_is_granted());
1143  my_block->set_next_flag_leaving();
1144  spin_until([my_block]{
1145  return my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving; });
1146  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1147  ASSERT_ND(my_block->get_pred_id() == 0);
1148  if (pred_block->is_reader()) {
1149  return cancel_reader_lock_with_reader_pred(lock, my_block, my_tail_int, pred);
1150  }
1151  goto retry;
1152  }
1153  } else if (eflags & McsRwExtendedBlock::kSuccFlagBusy) {
1154  ASSERT_ND(pred_block->next_flag_is_granted());
1155  ASSERT_ND(pred_block->next_flag_is_busy());
1156  my_block->set_pred_id(pred);
1157  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1158  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1159  }
1160  // try to tell pred I'm leaving
1161  if (pred_block->cas_next_weak(eflags | (static_cast<uint64_t>(my_tail_int) << 32),
1162  eflags | (static_cast<uint64_t>(McsRwExtendedBlock::kSuccIdSuccessorLeaving) << 32))) {
1163  break;
1164  }
1165  }
1166  // pred now has SuccessorLeaving on its next.id, it won't try to wake me up during release
1167  // now link the new successor and pred
1168  if (my_block->get_next_id() == 0 && lock->cas_tail_weak(my_tail_int, pred)) {
1169  pred_block->set_next_flag_no_successor();
1170  pred_block->set_next_id(0);
1171  ASSERT_ND(!my_block->next_flag_has_successor());
1172  return kErrorCodeLockCancelled;
1173  }
1174 
1175  cancel_reader_lock_relink(pred_block, my_block, my_tail_int, pred);
1176  return kErrorCodeLockCancelled;
1177  }
1178 
1179  ErrorCode cancel_reader_lock_with_reader_pred(
1180  McsRwLock* lock, McsRwExtendedBlock* my_block, uint32_t my_tail_int, uint32_t pred) {
1181  retry:
1182  ASSERT_ND(!my_block->next_flag_is_granted());
1183  ASSERT_ND(my_block->next_flag_is_leaving());
1184  // now successor can't attach to me assuming I'm waiting or has already done so.
1185  // CAS out of pred.next (including id and flags)
1186  ASSERT_ND(pred);
1187  ASSERT_ND(pred >> 16 != adaptor_.get_my_id());
1188  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1189  // wait for the canceling pred to finish the relink
1190  spin_until([pred_block, my_tail_int]{
1191  return pred_block->next_flag_has_reader_successor() &&
1192  (pred_block->get_next_id() == my_tail_int ||
1193  pred_block->get_next_id() == McsRwExtendedBlock::kSuccIdNoSuccessor); });
1194 
1195  uint64_t expected = pred_block->make_next_flag_waiting_with_reader_successor() |
1196  (static_cast<uint64_t>(my_tail_int) << 32);
1197  // only want to put SuccessorLeaving in the id field
1198  uint64_t desired = pred_block->make_next_flag_waiting_with_reader_successor() |
1199  (static_cast<uint64_t>(McsRwExtendedBlock::kSuccIdSuccessorLeaving) << 32);
1200  auto val = pred_block->cas_val_next_weak(expected, desired);
1202  if (val != expected) {
1203  // Note: we once registered after pred as a reader successor (still are), so if
1204  // pred happens to get the lock, it will wake me up seeing its reader_successor set
1205  auto pred_succ_flag = val & McsRwExtendedBlock::kSuccFlagMask;
1206  if (pred_succ_flag == McsRwExtendedBlock::kSuccFlagDirectGranted ||
1207  pred_succ_flag == McsRwExtendedBlock::kSuccFlagLeavingGranted) {
1208  // pred will in its finish-acquire-reader() wake me up.
1209  // pred already should alredy have me on its next.id and has reader successor class,
1210  // now me.pred.id is 0, blocking pred from waking me up, so just set me.pred.id
1211  // (the CAS loop in the "acquired" block).
1212  // this also covers the case when pred.next.flags has busy set.
1213  ASSERT_ND(pred_block->next_flag_has_reader_successor());
1214  my_block->set_pred_id(pred);
1215  my_block->timeout_granted(McsRwExtendedBlock::kTimeoutNever);
1216  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1217  } else {
1218  ASSERT_ND(
1219  (val & McsRwExtendedBlock::kSuccFlagMask) == McsRwExtendedBlock::kSuccFlagLeaving);
1220  // pred is trying to leave, wait for a new pred or being waken up
1221  // pred has higher priority to leave, and it should already have me on its next.id
1222  my_block->set_pred_id(pred);
1223  spin_until([my_block, pred]{ return my_block->get_pred_id() != pred; });
1224  // consume it and retry
1225  pred = my_block->xchg_pred_id(0);
1227  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1228  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1229  }
1230  pred_block = adaptor_.dereference_rw_tail_block(pred);
1231  ASSERT_ND(!my_block->pred_flag_is_granted());
1232  ASSERT_ND(pred);
1233  if (pred_block->is_writer()) {
1234  return cancel_reader_lock_with_writer_pred(lock, my_block, my_tail_int, pred);
1235  }
1236  goto retry;
1237  }
1238  } else {
1239  // at this point pred will be waiting for a new successor if it decides
1240  // to move and successor will be waiting for a new pred
1241  ASSERT_ND(my_block->next_flag_is_leaving());
1242  if (!my_block->next_flag_has_successor() && lock->cas_tail_weak(my_tail_int, pred)) {
1243  // newly arriving successor for this pred will wait
1244  // for the SuccessorLeaving mark to go away before trying the CAS
1245  ASSERT_ND(my_block->get_next_id() == 0);
1246  ASSERT_ND(my_block->next_flag_is_leaving());
1247  ASSERT_ND(!my_block->next_flag_has_successor());
1248  ASSERT_ND(pred_block->get_next_id() == McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1249  pred_block->set_next_flag_no_successor();
1250  pred_block->set_next_id(0);
1251  return kErrorCodeLockCancelled;
1252  }
1253 
1254  cancel_reader_lock_relink(pred_block, my_block, my_tail_int, pred);
1255  }
1256  return kErrorCodeLockCancelled;
1257  }
1258 
1259  void cancel_reader_lock_relink(
1260  McsRwExtendedBlock* pred_block,
1261  McsRwExtendedBlock* my_block,
1262  uint32_t my_tail_int,
1263  uint32_t pred) {
1264  spin_until([my_block]{ return my_block->get_next_id() != 0; });
1265  ASSERT_ND(my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1266  ASSERT_ND(my_block->next_flag_is_leaving());
1267  uint32_t next_id = my_block->get_next_id();
1268  ASSERT_ND(next_id);
1270  auto* succ_block = adaptor_.dereference_rw_tail_block(next_id);
1271  ASSERT_ND(pred);
1272 
1273  uint64_t successor = 0;
1274  if (my_block->next_flag_has_reader_successor()) {
1275  successor = static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagSuccessorReader) |
1276  (static_cast<uint64_t>(next_id) << 32);
1277  } else if (my_block->next_flag_has_writer_successor()) {
1278  successor = static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagSuccessorWriter) |
1279  (static_cast<uint64_t>(next_id) << 32);
1280  }
1281  ASSERT_ND(pred_block->next_flag_has_reader_successor());
1282  ASSERT_ND(pred_block->get_next_id() == McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1283 
1284  spin_until([pred_block, successor]{
1285  uint64_t expected = 0, new_next = 0;
1286  expected = pred_block->get_next();
1287  new_next = successor | (expected & static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagMask));
1288  if (expected & static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagBusy)) {
1289  new_next |= static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagBusy);
1290  }
1292  return pred_block->cas_next_weak(expected, new_next);
1293  });
1294 
1295  // I believe we should do this after setting pred.id, see the comment in cancel_writer_lock.
1296  spin_until([succ_block, my_tail_int, pred]{
1297  return succ_block->cas_pred_id_strong(my_tail_int, pred);
1298  });
1299  }
1300 
1301  ErrorCode acquire_reader_lock_check_writer_pred(
1302  McsRwLock* lock,
1303  McsRwExtendedBlock* my_block,
1304  uint32_t my_tail_int,
1305  uint32_t pred,
1306  int32_t timeout) {
1307  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1308  ASSERT_ND(pred_block->is_writer());
1309  // wait for the previous canceling dude to leave
1310  spin_until([pred_block]{
1311  return !pred_block->get_next_id() && !pred_block->next_flag_has_successor(); });
1312  // pred is a writer, we have to wait anyway, so register and wait with timeout
1313  ASSERT_ND(my_block->get_pred_id() == 0);
1314  pred_block->set_next_flag_reader_successor();
1315  pred_block->set_next_id(my_tail_int);
1316  if (my_block->xchg_pred_id(pred) == McsRwExtendedBlock::kPredIdAcquired) {
1318  }
1319 
1320  if (my_block->timeout_granted(timeout)) {
1321  return finish_acquire_reader_lock(lock, my_block, my_tail_int);
1322  }
1323  if (timeout == McsRwExtendedBlock::kTimeoutZero) {
1324  return kErrorCodeLockRequested;
1325  }
1326  return cancel_reader_lock(lock, my_tail_int);
1327  }
1328 
1329  void release_reader_lock(McsRwLock* lock, McsBlockIndex block_index) {
1330  auto id = adaptor_.get_my_id();
1331  auto my_tail_int = McsRwLock::to_tail_int(id, block_index);
1332  auto* my_block = adaptor_.get_rw_other_block(id, block_index);
1333 #ifndef NDEBUG
1334  ASSERT_ND(!my_block->is_released());
1335 #endif
1336 
1337  // make sure successor can't leave; readers, however, can still get the lock as usual
1338  // by seeing me.next.flags.granted set
1339  ASSERT_ND(my_block->next_flag_is_granted());
1340  my_block->set_next_flag_busy();
1341  spin_until([my_block]{
1342  return my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving; });
1343 
1344  uint32_t next_id = my_block->get_next_id();
1345  while (next_id == 0) {
1346  if (lock->cas_tail_weak(my_tail_int, 0)) { // really no one behind me
1347  finish_release_reader_lock(lock);
1348 #ifndef NDEBUG
1349  my_block->mark_released();
1350 #endif
1351  return;
1352  }
1353  next_id = my_block->get_next_id();
1354  }
1355 
1356  ASSERT_ND(next_id);
1358  if (next_id != McsRwExtendedBlock::kSuccIdNoSuccessor) { // already handled successor
1359  ASSERT_ND(my_block->next_flag_has_successor());
1360  if (my_block->next_flag_has_writer_successor()) {
1361  auto* succ_block = adaptor_.dereference_rw_tail_block(next_id);
1362  ASSERT_ND(!succ_block->pred_flag_is_granted());
1363  ASSERT_ND(succ_block->is_writer());
1364  ASSERT_ND(my_block->next_flag_has_writer_successor());
1365  // put it in next_writer
1366  ASSERT_ND(!lock->has_next_writer());
1367  auto next_writer_id = next_id >> 16;
1368  lock->set_next_writer(next_writer_id);
1369  ASSERT_ND(adaptor_.get_rw_other_async_block(next_writer_id, lock));
1370  // also tell successor it doesn't have pred any more
1371  spin_until([succ_block, my_tail_int]{
1372  return succ_block->cas_pred_id_strong(my_tail_int, 0);
1373  });
1374  }
1375  }
1376  finish_release_reader_lock(lock);
1377 #ifndef NDEBUG
1378  my_block->mark_released();
1379 #endif
1380  }
1381 
1382  void finish_release_reader_lock(McsRwLock* lock) {
1383  if (lock->decrement_nreaders() > 1) {
1384  return;
1385  }
1386  auto next_writer_id = lock->get_next_writer();
1387  ASSERT_ND(next_writer_id != adaptor_.get_my_id());
1388  if (next_writer_id != McsRwLock::kNextWriterNone &&
1389  lock->nreaders() == 0 &&
1390  lock->cas_next_writer_strong(next_writer_id, McsRwLock::kNextWriterNone)) {
1391  auto* wb = adaptor_.get_rw_other_async_block(next_writer_id, lock);
1392  ASSERT_ND(!wb->pred_flag_is_granted());
1393  spin_until([wb]{
1394  return wb->cas_pred_id_strong(0, McsRwExtendedBlock::kPredIdAcquired);
1395  });
1396  ASSERT_ND(lock->nreaders() == 0);
1397  wb->set_pred_flag_granted();
1398  }
1399  }
1400 
1401  ErrorCode acquire_writer_lock(
1402  McsRwLock* lock, McsBlockIndex* out_block_index, int32_t timeout) {
1403  auto* my_block = init_block(out_block_index, true);
1404  ASSERT_ND(my_block->is_writer());
1405  auto id = adaptor_.get_my_id();
1406  auto my_tail_int = McsRwLock::to_tail_int(id, *out_block_index);
1407  // register on my TLS lock-block_index mapping, must do this before setting pred.next.id or nw
1408  adaptor_.add_rw_async_mapping(lock, *out_block_index);
1409  auto pred = lock->xchg_tail(my_tail_int);
1410  if (pred == 0) {
1411  ASSERT_ND(lock->get_next_writer() == McsRwLock::kNextWriterNone);
1412  lock->set_next_writer(id);
1413  if (lock->nreaders() == 0) {
1414  if (lock->xchg_next_writer(McsRwLock::kNextWriterNone) == id) {
1415  my_block->set_flags_granted();
1416  ASSERT_ND(lock->nreaders() == 0);
1417  ASSERT_ND(lock->get_next_writer() == McsRwLock::kNextWriterNone);
1418  ASSERT_ND(my_block->next_flag_is_granted());
1419  adaptor_.remove_rw_async_mapping(lock);
1420  return kErrorCodeOk;
1421  }
1422  }
1423  } else {
1424  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1425  spin_until([pred_block]{
1426  return !pred_block->next_flag_has_successor() && !pred_block->get_next_id(); });
1427  // register on pred.flags as a writer successor, then fill in pred.next.id and wait
1428  // must register on pred.flags first
1429  pred_block->set_next_flag_writer_successor();
1430  pred_block->set_next_id(my_tail_int);
1431  }
1432 
1433  if (my_block->xchg_pred_id(pred) == McsRwExtendedBlock::kPredIdAcquired) {
1435  }
1436 
1437  if (my_block->timeout_granted(timeout)) {
1438  my_block->set_next_flag_granted();
1439  adaptor_.remove_rw_async_mapping(lock);
1440  ASSERT_ND(lock->nreaders() == 0);
1441  ASSERT_ND(lock->get_next_writer() == McsRwLock::kNextWriterNone);
1442  ASSERT_ND(my_block->next_flag_is_granted());
1443  return kErrorCodeOk;
1444  }
1445  if (timeout == McsRwExtendedBlock::kTimeoutZero) {
1446  return kErrorCodeLockRequested;
1447  }
1448  return cancel_writer_lock(lock, my_tail_int);
1449  }
1450 
1451  void release_writer_lock(McsRwLock* lock, McsBlockIndex block_index) {
1452  auto id = adaptor_.get_my_id();
1453  auto my_tail_int = McsRwLock::to_tail_int(id, block_index);
1454  auto* my_block = adaptor_.get_rw_my_block(block_index);
1455 
1456  ASSERT_ND(my_block->next_flag_is_granted());
1457  ASSERT_ND(lock->nreaders() == 0);
1458  ASSERT_ND(lock->get_next_writer() == McsRwLock::kNextWriterNone);
1459  ASSERT_ND(my_block->pred_flag_is_granted());
1460  ASSERT_ND(my_block->next_flag_is_granted());
1461  my_block->set_next_flag_busy(); // make sure succesor can't leave
1462  spin_until([my_block]{
1463  return my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving; });
1464  ASSERT_ND(my_block->pred_flag_is_granted());
1465  ASSERT_ND(my_block->next_flag_is_granted());
1466  ASSERT_ND(my_block->next_flag_is_busy());
1467  ASSERT_ND(lock->nreaders() == 0);
1468 
1469  uint32_t next_id = my_block->get_next_id();
1470  while (next_id == 0) {
1471  assorted::yield_if_valgrind(); // TASK(Hideaki) should use spin_until(), but not trivial
1472  if (lock->cas_tail_weak(my_tail_int, 0)) {
1473  return;
1474  }
1475  next_id = my_block->get_next_id();
1476  }
1477  ASSERT_ND(lock->nreaders() == 0);
1478  ASSERT_ND(my_block->next_flag_has_successor());
1479  ASSERT_ND(next_id);
1481 
1482  auto* succ_block = adaptor_.dereference_rw_tail_block(next_id);
1483  ASSERT_ND(lock->nreaders() == 0);
1484  ASSERT_ND(!succ_block->pred_flag_is_granted());
1485  ASSERT_ND(succ_block->get_pred_id() != McsRwExtendedBlock::kPredIdAcquired);
1486  while (!succ_block->cas_pred_id_strong(my_tail_int, McsRwExtendedBlock::kPredIdAcquired)) {
1487  assorted::yield_if_valgrind(); // TASK(Hideaki) also not trivial because of the assert
1488  ASSERT_ND(my_block->get_next_id() == next_id);
1489  }
1490  if (succ_block->is_reader()) {
1491  lock->increment_nreaders();
1492  }
1493  succ_block->set_pred_flag_granted();
1494  }
1495 
1496  ErrorCode cancel_writer_lock(McsRwLock* lock, uint32_t my_tail_int) {
1497  start_cancel:
1498  auto* my_block = adaptor_.dereference_rw_tail_block(my_tail_int);
1499  auto pred = my_block->xchg_pred_id(0);
1500  // if pred is a releasing writer and already dereference my id, it will CAS me.pred.id
1501  // to Acquired, so we do a final check here; there's no way back after this point
1502  // (unless pred is a reader and it's already gone).
1503  // After my xchg, pred will be waiting for me to give it a new successor.
1505  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1506  my_block->set_next_flag_granted();
1507  adaptor_.remove_rw_async_mapping(lock);
1508  ASSERT_ND(lock->nreaders() == 0);
1509  return kErrorCodeOk;
1510  }
1511 
1512  // "freeze" the successor
1513  my_block->set_next_flag_leaving();
1514  ASSERT_ND(!my_block->next_flag_is_granted());
1515  spin_until([my_block]{
1516  return my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving; });
1517 
1518  // if I still have a pred, then deregister from it; if I don't have a pred,
1519  // that means my pred has put me on next_writer, deregister from there and go
1520  // Note that the reader should first reset me.pred.id, then put me on lock.nw
1521  if (pred == 0) {
1522  return cancel_writer_lock_no_pred(lock, my_block, my_tail_int);
1523  }
1524  ASSERT_ND(pred);
1525  auto* pred_block = adaptor_.dereference_rw_tail_block(pred);
1526  while (true) {
1527  // wait for cancelling pred to finish relink, note pred_block is updated
1528  // later in the if block as well
1529  spin_until([pred_block, my_tail_int]{
1530  return pred_block->get_next_id() == my_tail_int &&
1531  pred_block->next_flag_has_writer_successor(); });
1532  // whatever flags value it might have, just not Leaving
1533  uint64_t eflags = pred_block->read_next_flags();
1534  if ((eflags & McsRwExtendedBlock::kSuccFlagMask) == McsRwExtendedBlock::kSuccFlagLeaving) {
1535  ASSERT_ND(my_block->get_pred_id() == 0);
1536  // pred might be cancelling, we won't know if it'll eventually
1537  // get the lock or really cancel. In the former case it won't update my pred;
1538  // in the latter case it will. So just recover me.pred.id and retry (can't reset
1539  // next.flags to Waiting - that will confuse our successor).
1540  my_block->set_pred_id(pred);
1541  goto start_cancel;
1542  } else if (eflags & static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagBusy)) {
1543  // pred is perhaps releasing (writer)? me.pred.id is 0, pred can do nothing about me,
1544  // so it's safe to dereference
1545  if (pred_block->is_writer()) {
1546  ASSERT_ND(pred_block->get_next_id() == my_tail_int);
1547  my_block->set_pred_id(pred);
1548  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1549  ASSERT_ND(my_block->get_pred_id() == McsRwExtendedBlock::kPredIdAcquired);
1550  my_block->set_next_flag_granted();
1551  adaptor_.remove_rw_async_mapping(lock);
1552  ASSERT_ND(lock->nreaders() == 0);
1553  return kErrorCodeOk;
1554  }
1555  ASSERT_ND(pred_block->is_reader());
1556  my_block->set_pred_id(pred);
1557  pred = my_block->xchg_pred_id(0);
1558  if (pred == 0) {
1559  return cancel_writer_lock_no_pred(lock, my_block, my_tail_int);
1560  } else if (pred == McsRwExtendedBlock::kPredIdAcquired) {
1561  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1562  my_block->set_next_flag_granted();
1563  adaptor_.remove_rw_async_mapping(lock);
1564  ASSERT_ND(lock->nreaders() == 0);
1565  return kErrorCodeOk;
1566  }
1567  pred_block = adaptor_.dereference_rw_tail_block(pred);
1568  continue; // retry if it's a reader
1569  }
1570  ASSERT_ND(pred_block->get_next_id() == my_tail_int);
1571  uint64_t desired = eflags |
1572  (static_cast<uint64_t>(McsRwExtendedBlock::kSuccIdSuccessorLeaving) << 32);
1573  uint64_t expected = eflags | (static_cast<uint64_t>(my_tail_int) << 32);
1574  ASSERT_ND(
1575  (expected & McsRwExtendedBlock::kSuccFlagMask) != McsRwExtendedBlock::kSuccFlagLeaving);
1576  auto val = pred_block->cas_val_next_weak(expected, desired);
1577  if (val == expected) {
1578  ASSERT_ND(pred_block->get_next_id() == McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1579  break;
1580  }
1581  }
1582 
1583  ASSERT_ND(pred_block->get_next_id() == McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1584  if (!my_block->get_next_id() && lock->cas_tail_weak(my_tail_int, pred)) {
1585  pred_block->set_next_flag_no_successor();
1586  pred_block->set_next_id(0);
1587  adaptor_.remove_rw_async_mapping(lock);
1588  return kErrorCodeLockCancelled;
1589  }
1590  spin_until([my_block]{ return my_block->get_next_id() != 0; });
1591  ASSERT_ND(my_block->get_next_id() != McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1592  ASSERT_ND(my_block->next_flag_is_leaving());
1593  uint32_t new_next_id = my_block->get_next_id();
1594  ASSERT_ND(new_next_id);
1596  auto* succ_block = adaptor_.dereference_rw_tail_block(new_next_id);
1597 
1598  uint64_t successor = 0;
1599  if (my_block->next_flag_has_reader_successor()) {
1600  successor = static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagSuccessorReader) |
1601  (static_cast<uint64_t>(new_next_id) << 32);
1602  } else if (my_block->next_flag_has_writer_successor()) {
1603  successor = static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagSuccessorWriter) |
1604  (static_cast<uint64_t>(new_next_id) << 32);
1605  }
1606  ASSERT_ND(pred_block->next_flag_has_writer_successor());
1607  ASSERT_ND(pred_block->get_next_id() == McsRwExtendedBlock::kSuccIdSuccessorLeaving);
1608 
1609  retry:
1610  // preserve pred.flags
1611  uint64_t expected = 0, new_next = 0;
1612  expected = pred_block->get_next();
1614  bool wakeup = false;
1615 
1616  if (pred_block->is_reader() && succ_block->is_reader()) {
1617  uint32_t flags = expected & McsRwExtendedBlock::kSuccFlagMask;
1620  // There is a time window which starts after the pred finishedits "acquired" block
1621  // and ends before it releases. During this period my relink is essentially invisible
1622  // to pred. So we try to wake up the successor if this is the case.
1623  successor = static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagSuccessorReader) |
1624  (static_cast<uint64_t>(McsRwExtendedBlock::kSuccIdNoSuccessor) << 32);
1625  wakeup = true;
1626  }
1627  }
1628 
1629  new_next = (successor | static_cast<uint64_t>(expected & McsRwExtendedBlock::kSuccFlagMask));
1630  if (expected & McsRwExtendedBlock::kSuccFlagBusy) {
1631  new_next |= static_cast<uint64_t>(McsRwExtendedBlock::kSuccFlagBusy);
1632  }
1633  if (!pred_block->cas_next_weak(expected, new_next)) {
1634  goto retry;
1635  }
1636 
1637  // Now we need to wake up the successor if needed and set succ.pred.id - must set succ.pred.id
1638  // after setting pred.next.id: if we need to wake up successor, we need to also set pred.next.id
1639  // to NoSuccessor, which makes it not safe for succ to spin on pred.next.id to wait for me
1640  // finishing this relink (pred might disappear any time because its next.id is NoSuccessor).
1641  if (wakeup) {
1642  ASSERT_ND(succ_block->pred_flag_is_waiting());
1643  lock->increment_nreaders();
1644  succ_block->set_pred_flag_granted();
1645  spin_until([succ_block, my_tail_int]{
1646  return succ_block->cas_pred_id_strong(my_tail_int, McsRwExtendedBlock::kPredIdAcquired);
1647  });
1648  } else {
1649  spin_until([succ_block, my_tail_int, pred]{
1650  return succ_block->cas_pred_id_strong(my_tail_int, pred);
1651  });
1652  }
1653  adaptor_.remove_rw_async_mapping(lock);
1654  return kErrorCodeLockCancelled;
1655  }
1656 
1657  ErrorCode cancel_writer_lock_no_pred(
1658  McsRwLock* lock, McsRwExtendedBlock* my_block, uint32_t my_tail_int) {
1659  spin_until([lock, my_block]{
1660  return lock->get_next_writer() != xct::McsRwLock::kNextWriterNone ||
1661  !my_block->pred_flag_is_waiting(); });
1662  if (my_block->pred_flag_is_granted() ||
1663  !lock->cas_next_writer_strong(adaptor_.get_my_id(), xct::McsRwLock::kNextWriterNone)) {
1664  // reader picked me up...
1665  spin_until([my_block]{ return my_block->pred_flag_is_granted(); });
1666  my_block->set_next_flag_granted();
1667  adaptor_.remove_rw_async_mapping(lock);
1668  return kErrorCodeOk;
1669  }
1670 
1671  // so lock.next_writer is null now, try to fix the lock tail
1672  if (my_block->get_next_id() == 0 && lock->cas_tail_weak(my_tail_int, 0)) {
1673  adaptor_.remove_rw_async_mapping(lock);
1674  return kErrorCodeLockCancelled;
1675  }
1676 
1677  spin_until([my_block]{ return my_block->get_next_id() != 0; });
1678  auto next_id = my_block->get_next_id();
1680 
1681  // because I don't have a pred, if next_id is a writer, I should put it in lock.nw
1682  auto* succ_block = adaptor_.dereference_rw_tail_block(next_id);
1683  ASSERT_ND(succ_block->pred_flag_is_waiting());
1684  if (succ_block->is_writer()) {
1685  ASSERT_ND(my_block->next_flag_has_writer_successor());
1686  ASSERT_ND(lock->get_next_writer() == xct::McsRwLock::kNextWriterNone);
1687  // remaining readers will use CAS on lock.nw, so we blind write
1688  lock->set_next_writer(next_id >> 16); // thread id only
1689  spin_until([succ_block, my_tail_int]{
1690  return succ_block->cas_pred_id_strong(my_tail_int, 0);
1691  });
1692  if (lock->nreaders() == 0) {
1693  if (lock->cas_next_writer_strong(next_id >> 16, McsRwLock::kNextWriterNone)) {
1694  // ok, I'm so nice, cancelled myself and woke up a successor
1695  spin_until([succ_block]{
1696  return succ_block->cas_pred_id_strong(0, McsRwExtendedBlock::kPredIdAcquired);
1697  });
1698  succ_block->set_pred_flag_granted();
1699  }
1700  }
1701  } else {
1702  // successor is a reader, lucky for it...
1703  ASSERT_ND(my_block->next_flag_has_reader_successor());
1704  ASSERT_ND(succ_block->is_reader());
1705  spin_until([succ_block, my_tail_int]{
1706  return succ_block->cas_pred_id_strong(my_tail_int, McsRwExtendedBlock::kPredIdAcquired); });
1707  lock->increment_nreaders();
1708  succ_block->set_pred_flag_granted();
1709  }
1710  adaptor_.remove_rw_async_mapping(lock);
1711  return kErrorCodeLockCancelled;
1712  }
1713 
1714  ADAPTOR adaptor_;
1715 }; // end of McsImpl<ADAPTOR, McsRwExtendedBlock> specialization
1716 
1717 
1722 template class McsWwImpl< McsMockAdaptor<McsRwSimpleBlock> >;
1723 template class McsWwImpl< thread::ThreadPimplMcsAdaptor<McsRwSimpleBlock> >;
1724 template class McsWwImpl< McsMockAdaptor<McsRwExtendedBlock> >;
1725 template class McsWwImpl< thread::ThreadPimplMcsAdaptor<McsRwExtendedBlock> >;
1726 
1727 template class McsImpl< McsMockAdaptor<McsRwSimpleBlock> , McsRwSimpleBlock>;
1728 template class McsImpl< McsMockAdaptor<McsRwExtendedBlock> , McsRwExtendedBlock>;
1729 template class McsImpl< thread::ThreadPimplMcsAdaptor<McsRwSimpleBlock> , McsRwSimpleBlock>;
1730 template class McsImpl< thread::ThreadPimplMcsAdaptor<McsRwExtendedBlock> , McsRwExtendedBlock>;
1731 } // namespace xct
1732 } // namespace foedus
void reset_release() __attribute__((always_inline))
Definition: xct_id.hpp:370
void reset_guest_id_release()
Definition: xct_id.hpp:357
void cancel_async_rw_writer(McsRwLock *lock, McsBlockIndex block_index)
void release_rw_reader(McsRwLock *mcs_rw_lock, McsBlockIndex block_index)
bool retry_async_rw_reader(McsRwLock *lock, McsBlockIndex block_index)
McsBlockIndex initial(McsWwLock *lock)
[WW] This doesn't use any atomic operation.
static void ownerless_release(McsWwLock *lock)
void cancel_async_rw_writer(McsRwLock *, McsBlockIndex)
static uint32_t to_tail_int(thread::ThreadId tail_waiter, McsBlockIndex tail_waiter_block)
Definition: xct_id.hpp:848
Exclusive-only (WW) MCS lock classes.
Definition: xct_id.hpp:182
0x0AA3 : "XCTION : Lock acquire requested." .
Definition: error_code.hpp:208
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
thread::ThreadId get_tail_waiter() const __attribute__((always_inline))
This is a "relaxed" check.
Definition: xct_id.hpp:344
uint32_t get_successor_thread_id_relaxed() const __attribute__((always_inline))
Carefully use this! In some places you must call copy_once() then call this on the copy...
Definition: xct_id.hpp:291
uint64_t spin_until(COND spin_until_cond)
Spin locally until the given condition returns true.
static const uint32_t kSuccFlagBusy
Definition: xct_id.hpp:534
static const int32_t kTimeoutZero
Definition: xct_id.hpp:552
Reader-writer (RW) MCS lock classes.
Definition: xct_id.hpp:387
void release(McsWwLock *lock, McsBlockIndex block_index)
[WW] Unlcok an MCS lock acquired by this thread.
bool is_blocked() __attribute__((always_inline))
Definition: xct_id.hpp:447
bool is_guest_relaxed() const __attribute__((always_inline))
Definition: xct_id.hpp:235
void release_rw_reader(McsRwLock *lock, McsBlockIndex block_index)
bool retry_async_rw_writer(McsRwLock *lock, McsBlockIndex block_index)
uint32_t get_thread_id_relaxed() const __attribute__((always_inline))
Carefully use this! In some places you must call get_word_once() then call this on the copy...
Definition: xct_id.hpp:243
void cancel_async_rw_reader(McsRwLock *, McsBlockIndex)
void unblock() __attribute__((always_inline))
Definition: xct_id.hpp:441
AcquireAsyncRet acquire_async_rw_reader(McsRwLock *lock)
Async acquire methods, passing timeout 0 will avoid cancelling upon timeout in the internal rountines...
static const uint32_t kSuccFlagSuccessorReader
Definition: xct_id.hpp:537
static void ownerless_initial(McsWwLock *lock)
bool retry_async_rw_reader(McsRwLock *lock, McsBlockIndex block_index)
[RW] Returns whether the lock requeust is now granted.
void clear_successor_release() __attribute__((always_inline))
Definition: xct_id.hpp:302
thread::ThreadId successor_thread_id_
Definition: xct_id.hpp:417
thread::ThreadId next_writer_
Definition: xct_id.hpp:867
0 means no-error.
Definition: error_code.hpp:87
bool is_valid_relaxed() const __attribute__((always_inline))
Definition: xct_id.hpp:230
McsBlockIndex acquire_unconditional_rw_reader(McsRwLock *lock)
thread::ThreadId get_next_writer()
Definition: xct_id.hpp:825
Return value of acquire_async_rw.
Definition: xct_id.hpp:161
Pre-allocated MCS block for WW-locks.
Definition: xct_id.hpp:274
Definitions of IDs in this package and a few related constant values.
bool has_successor_acquire() const __attribute__((always_inline))
Definition: xct_id.hpp:285
An exclusive-only (WW) MCS lock data structure.
Definition: xct_id.hpp:324
Implements an MCS-locking Algorithm.
AcquireAsyncRet acquire_async_rw_writer(McsRwLock *lock)
AcquireAsyncRet acquire_async_rw_writer(McsRwLock *lock)
Pre-allocated MCS block for extended version of RW-locks.
Definition: xct_id.hpp:513
static const uint32_t kPredIdAcquired
Definition: xct_id.hpp:544
static const uint32_t kSuccIdSuccessorLeaving
Definition: xct_id.hpp:541
static bool ownerless_acquire_try(McsWwLock *lock)
[WW-Guest] Try to take an exclusive guest lock on the given MCSg lock.
McsBlockIndex acquire_unconditional(McsWwLock *lock)
[WW] Unconditionally takes exclusive-only MCS lock on the given lock.
static const uint32_t kSuccFlagSuccessorWriter
Definition: xct_id.hpp:539
static const uint32_t kSuccFlagMask
Definition: xct_id.hpp:532
bool is_reader() __attribute__((always_inline))
Definition: xct_id.hpp:435
const uint64_t kMcsGuestId
A special value meaning the lock is held by a non-regular guest that doesn't have a context...
Definition: xct_id.hpp:158
void spin_until(COND spin_until_cond)
bool retry_async_rw_reader(McsRwLock *lock, McsBlockIndex block_index)
void release_rw_writer(McsRwLock *lock, McsBlockIndex block_index)
McsBlockIndex acquire_unconditional_rw_reader(McsRwLock *mcs_rw_lock)
static const int32_t kTimeoutNever
Definition: xct_id.hpp:551
McsBlockIndex acquire_try(McsWwLock *lock)
[WW] Try to take an exclusive lock.
static const uint32_t kSuccFlagLeavingGranted
Definition: xct_id.hpp:531
static const uint32_t kSuccFlagDirectGranted
Definition: xct_id.hpp:530
bool is_locked() const
This is a "relaxed" check.
Definition: xct_id.hpp:334
McsBlockIndex acquire_try_rw_writer(McsRwLock *lock)
Instant-try versions, won't leave node in the queue if failed.
void cancel_async_rw_reader(McsRwLock *lock, McsBlockIndex block_index)
McsBlockIndex acquire_try_rw_writer(McsRwLock *lock)
uint32_t McsBlockIndex
Index in thread-local MCS block.
Definition: xct_id.hpp:153
static const uint32_t kSuccFlagLeaving
Definition: xct_id.hpp:529
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
uint16_t nreaders()
Definition: xct_id.hpp:814
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
McsBlockIndex acquire_unconditional_rw_writer(McsRwLock *mcs_rw_lock)
void assert_mcs_aligned(const void *address)
static const thread::ThreadId kNextWriterNone
Definition: xct_id.hpp:796
bool retry_async_rw_writer(McsRwLock *lock, McsBlockIndex block_index)
McsWwBlockData tail_
Definition: xct_id.hpp:377
McsBlockIndex acquire_unconditional_rw_writer(McsRwLock *lock)
static const uint32_t kSuccIdNoSuccessor
Definition: xct_id.hpp:542
void yield_if_valgrind()
Use this in your while as a stop-gap before switching to spin_until().
McsBlockIndex get_block_relaxed() const __attribute__((always_inline))
Carefully use this! In some places you must call get_word_once() then call this on the copy...
Definition: xct_id.hpp:250
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool retry_async_rw_writer(McsRwLock *lock, McsBlockIndex block_index)
uint64_t word_
The high 32-bits is thread_id, the low 32-bit is block-index.
Definition: xct_id.hpp:189
McsBlockIndex acquire_try_rw_reader(McsRwLock *lock)
McsBlockIndex successor_block_index_
Definition: xct_id.hpp:418
static void ownerless_acquire_unconditional(McsWwLock *lock)
[WW-Guest] Unconditionally takes exclusive-only guest lock on the given MCSg lock.
AcquireAsyncRet acquire_async_rw_reader(McsRwLock *lock)
An MCS reader-writer lock data structure.
Definition: xct_id.hpp:795
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
0x0AA2 : "XCTION : Lock acquire cancelled." .
Definition: error_code.hpp:207
void release_rw_writer(McsRwLock *mcs_rw_lock, McsBlockIndex block_index)
McsBlockIndex acquire_try_rw_reader(McsRwLock *lock)
static const uint32_t kSuccFlagSuccessorClassMask
Definition: xct_id.hpp:536
uint16_t decrement_nreaders()
Definition: xct_id.hpp:811