libfoedus-core
FOEDUS Core Library
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
xct_mcs_adapter_impl.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_XCT_XCT_MCS_ADAPTER_IMPL_HPP_
19 #define FOEDUS_XCT_XCT_MCS_ADAPTER_IMPL_HPP_
20 
21 #include <atomic>
22 #include <memory>
23 #include <vector>
24 
25 #include "foedus/assert_nd.hpp"
26 #include "foedus/compiler.hpp"
29 #include "foedus/storage/page.hpp"
32 #include "foedus/xct/xct_id.hpp"
33 
34 namespace foedus {
35 namespace xct {
36 
56 template<typename RW_BLOCK>
58  public:
60  typedef RW_BLOCK ThisRwBlock;
61 
74  void cancel_new_block(McsBlockIndex the_block);
75 
78 
83 
85  std::atomic<bool>* me_waiting();
86 
88  std::atomic<bool>* other_waiting(thread::ThreadId id);
89 
93  RW_BLOCK* get_rw_my_block(McsBlockIndex index);
94 
100  RW_BLOCK* get_rw_other_block(uint32_t block_int);
101 
103  RW_BLOCK* dereference_rw_tail_block(uint32_t tail_int);
104 
108 
109  void add_rw_async_mapping(xct::McsRwLock* lock, xct::McsBlockIndex block_index);
110 
111  private:
112  McsAdaptorConcept() = delete;
113  ~McsAdaptorConcept() = delete;
114 };
115 
128 template<typename RW_BLOCK>
130  McsMockThread() = default;
132  // heh, std::vector::resize() demands a move constructor.
133  mcs_ww_blocks_ = std::move(rhs.mcs_ww_blocks_);
134  mcs_rw_blocks_ = std::move(rhs.mcs_rw_blocks_);
135  mcs_rw_async_mappings_ = std::move(rhs.mcs_rw_async_mappings_);
136  mcs_block_current_ = rhs.mcs_block_current_;
137  mcs_waiting_.store(rhs.mcs_waiting_.load()); // mainly due to this guy, this is NOT a move
138  }
139  void init(uint32_t max_block_count) {
140  mcs_ww_blocks_.resize(max_block_count);
141  mcs_rw_blocks_.resize(max_block_count);
142  mcs_rw_async_mappings_.resize(max_block_count);
144  mcs_block_current_ = 0;
145  mcs_waiting_ = false;
146  }
148  const memory::GlobalVolatilePageResolver& resolver,
149  xct::McsRwLock* lock) {
150  auto ulockid = xct::rw_lock_to_universal_lock_id(resolver, lock);
151  for (auto &m : mcs_rw_async_mappings_) {
152  if (m.lock_id_ == ulockid) {
153  return m.block_index_;
154  }
155  }
156  return 0;
157  }
158 
159  std::vector<McsWwBlock> mcs_ww_blocks_;
160  std::vector< RW_BLOCK > mcs_rw_blocks_;
161  std::vector<xct::McsRwAsyncMapping> mcs_rw_async_mappings_;
164  std::atomic<bool> mcs_waiting_;
165  // add more if we need more context
166 };
167 
168 constexpr uint32_t kMcsMockDataPageHeaderSize = 128U;
169 constexpr uint32_t kMcsMockDataPageHeaderPad
170  = kMcsMockDataPageHeaderSize - sizeof(storage::PageHeader);
171 constexpr uint32_t kMcsMockDataPageLocksPerPage
173  / (sizeof(RwLockableXctId) + sizeof(McsWwLock));
174 constexpr uint32_t kMcsMockDataPageFiller
177 
178 
185  void init(storage::StorageId dummy_storage_id, uint16_t node_id, uint32_t in_node_index) {
187  page_id.set(node_id, in_node_index);
188  header_.init_volatile(page_id, dummy_storage_id, storage::kArrayPageType);
189  for (uint32_t i = 0; i < kMcsMockDataPageLocksPerPage; ++i) {
190  tid_[i].reset();
191  ww_[i].reset();
192  }
193  }
199 };
200 
205 template<typename RW_BLOCK>
206 struct McsMockNode {
207  void init(
208  storage::StorageId dummy_storage_id,
209  uint16_t node_id,
210  uint32_t threads_per_node,
211  uint32_t max_block_count,
212  uint32_t pages_per_node) {
213  node_id_ = node_id;
214  max_block_count_ = max_block_count;
215  threads_.resize(threads_per_node);
216  for (uint32_t t = 0; t < threads_per_node; ++t) {
217  threads_[t].init(max_block_count);
218  }
219 
221  sizeof(McsMockDataPage) * pages_per_node,
222  sizeof(McsMockDataPage),
223  node_id);
225  pages_ = reinterpret_cast<McsMockDataPage*>(page_memory_.get_block());
226  for (uint32_t i = 0; i < pages_per_node; ++i) {
227  pages_[i].init(dummy_storage_id, node_id, i);
228  }
229  }
230 
231  uint16_t node_id_;
233  std::vector< McsMockThread<RW_BLOCK> > threads_;
234 
240 };
241 
246 template<typename RW_BLOCK>
248  void init(
249  storage::StorageId dummy_storage_id,
250  uint32_t nodes,
251  uint32_t threads_per_node,
252  uint32_t max_block_count,
253  uint32_t max_lock_count) {
254  max_block_count_ = max_block_count;
255  max_lock_count_ = max_lock_count;
256  // + 1U for index-0 (which is not used), and +1U for ceiling
258  nodes_.resize(nodes);
263  for (uint32_t n = 0; n < nodes; ++n) {
264  nodes_[n].init(dummy_storage_id, n, threads_per_node, max_block_count, pages_per_node_);
265  page_memory_resolver_.bases_[n] = reinterpret_cast<storage::Page*>(nodes_[n].pages_);
266  }
267  }
268 
269  RwLockableXctId* get_rw_lock_address(uint16_t node_id, uint64_t lock_index) {
270  ASSERT_ND(lock_index < max_lock_count_);
271  ASSERT_ND(node_id < nodes_.size());
272  const uint64_t page_index = lock_index / kMcsMockDataPageLocksPerPage + 1U;
273  const uint64_t lock_in_page_index = lock_index % kMcsMockDataPageLocksPerPage;;
274  ASSERT_ND(page_index < pages_per_node_);
275  McsMockDataPage* page = nodes_[node_id].pages_ + page_index;
276  return page->tid_ + lock_in_page_index;
277  }
278  McsWwLock* get_ww_lock_address(uint16_t node_id, uint64_t lock_index) {
279  ASSERT_ND(lock_index < max_lock_count_);
280  ASSERT_ND(node_id < nodes_.size());
281  const uint64_t page_index = lock_index / kMcsMockDataPageLocksPerPage + 1U;
282  const uint64_t lock_in_page_index = lock_index % kMcsMockDataPageLocksPerPage;;
283  ASSERT_ND(page_index < pages_per_node_);
284  McsMockDataPage* page = nodes_[node_id].pages_ + page_index;
285  return page->ww_ + lock_in_page_index;
286  }
287 
289  uint32_t max_lock_count_;
290  uint32_t pages_per_node_;
291  std::vector< McsMockNode<RW_BLOCK> > nodes_;
298 };
299 
304 template<typename RW_BLOCK>
306  public:
307  typedef RW_BLOCK ThisRwBlock;
308 
310  : id_(id),
311  numa_node_(thread::decompose_numa_node(id)),
312  local_ordinal_(thread::decompose_numa_local_ordinal(id)),
313  context_(context),
314  me_(context->nodes_[numa_node_].threads_.data() + local_ordinal_) {}
316 
318  // if this assertion fires, probably we are retrying something too many times
319  ASSERT_ND(me_->mcs_block_current_ < 0xFFFFU);
320  return ++me_->mcs_block_current_;
321  }
322  void cancel_new_block(McsBlockIndex the_block) {
323  ASSERT_ND(me_->mcs_block_current_ == the_block);
324  --me_->mcs_block_current_;
325  }
326  McsBlockIndex get_cur_block() const { return me_->mcs_block_current_; }
327  thread::ThreadId get_my_id() const { return id_; }
328  thread::ThreadGroupId get_my_numa_node() const { return numa_node_; }
329  std::atomic<bool>* me_waiting() { return &me_->mcs_waiting_; }
330 
332  ASSERT_ND(index <= me_->mcs_block_current_);
333  return me_->mcs_ww_blocks_.data() + index;
334  }
335  RW_BLOCK* get_rw_my_block(McsBlockIndex index) {
336  ASSERT_ND(index <= me_->mcs_block_current_);
337  return me_->mcs_rw_blocks_.data() + index;
338  }
339 
342  ASSERT_ND(node < context_->nodes_.size());
344  ASSERT_ND(ordinal < context_->nodes_[node].threads_.size());
345  return context_->nodes_[node].threads_.data() + ordinal;
346  }
347  std::atomic<bool>* other_waiting(thread::ThreadId id) {
349  return &(other->mcs_waiting_);
350  }
353  return other->mcs_block_current_;
354  }
357  ASSERT_ND(index <= other->mcs_block_current_);
358  return other->mcs_ww_blocks_.data() + index;
359  }
362  ASSERT_ND(index <= other->mcs_block_current_);
363  return other->mcs_rw_blocks_.data() + index;
364  }
365  RW_BLOCK* get_rw_other_block(uint32_t block_int) {
366  McsMockThread<RW_BLOCK>* other = get_other_thread(block_int >> 16);
367  McsBlockIndex index = block_int & 0xFFFFU;
368  ASSERT_ND(index <= other->mcs_block_current_);
369  return other->mcs_rw_blocks_.data() + index;
370  }
371  RW_BLOCK* dereference_rw_tail_block(uint32_t tail_int) {
372  McsRwLock tail_tmp;
373  tail_tmp.tail_ = tail_int;
374  uint32_t tail_id = tail_tmp.get_tail_waiter();
375  uint32_t tail_block = tail_tmp.get_tail_waiter_block();
376  return get_rw_other_block(tail_id, tail_block);
377  }
380  McsBlockIndex block
381  = other->get_mcs_rw_async_block_index(context_->page_memory_resolver_, lock);
382  ASSERT_ND(block);
383  return get_rw_other_block(id, block);
384  }
386  ASSERT_ND(lock);
387  ASSERT_ND(block_index);
388  auto ulockid = xct::rw_lock_to_universal_lock_id(context_->page_memory_resolver_, lock);
389 #ifndef NDEBUG
390  for (uint32_t i = 0; i < me_->mcs_rw_async_mapping_current_; ++i) {
391  if (me_->mcs_rw_async_mappings_[i].lock_id_ == ulockid) {
392  ASSERT_ND(false);
393  }
394  }
395 #endif
396  // TLS, no concurrency control needed
397  auto index = me_->mcs_rw_async_mapping_current_++;
398  ASSERT_ND(me_->mcs_rw_async_mappings_[index].lock_id_ == kNullUniversalLockId);
399  me_->mcs_rw_async_mappings_[index].lock_id_ = ulockid;
400  me_->mcs_rw_async_mappings_[index].block_index_ = block_index;
401  }
403  ASSERT_ND(me_->mcs_rw_async_mapping_current_);
404  auto lock_id = xct::rw_lock_to_universal_lock_id(context_->page_memory_resolver_, lock);
405  for (uint32_t i = 0; i < me_->mcs_rw_async_mapping_current_; ++i) {
406  if (me_->mcs_rw_async_mappings_[i].lock_id_ == lock_id) {
407  me_->mcs_rw_async_mappings_[i].lock_id_ = kNullUniversalLockId;
408  --me_->mcs_rw_async_mapping_current_;
409  return;
410  }
411  }
412  ASSERT_ND(false);
413  }
414 
415  private:
416  const thread::ThreadId id_;
417  const thread::ThreadGroupId numa_node_;
418  const thread::ThreadLocalOrdinal local_ordinal_;
419  McsMockContext<RW_BLOCK>* const context_;
420  McsMockThread<RW_BLOCK>* const me_;
421 };
422 
423 static_assert(sizeof(McsMockDataPage) == storage::kPageSize, "McsMockDataPage not in kPageSize?");
424 
425 } // namespace xct
426 } // namespace foedus
427 #endif // FOEDUS_XCT_XCT_MCS_ADAPTER_IMPL_HPP_
void reset() __attribute__((always_inline))
used only while page initialization
Definition: xct_id.hpp:355
std::vector< RW_BLOCK > mcs_rw_blocks_
void remove_rw_async_mapping(xct::McsRwLock *lock)
char header_pad_[kMcsMockDataPageHeaderPad]
uint8_t ThreadLocalOrdinal
Typedef for a local ID of Thread (core), which is NOT unique across NUMA nodes.
Definition: thread_id.hpp:58
RW_BLOCK * dereference_rw_tail_block(uint32_t tail_int)
same as above, but receives a combined int in For McsRwLock
Definitions of IDs in this package and a few related constant values.
void cancel_new_block(McsBlockIndex the_block)
Cancels the most recent issue_new_block() call, decrementing the counter.
memory::AlignedMemory page_memory_
RW_BLOCK ThisRwBlock
ThisRwBlock shall indicate the block type.
RW_BLOCK * get_rw_other_block(thread::ThreadId id, McsBlockIndex index)
Dereference other thread's block index for reader-writer locks.
memory::GlobalVolatilePageResolver page_memory_resolver_
All locks managed by this objects are placed in these memory regions.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
void init_volatile(VolatilePagePointer page_id, StorageId storage_id, PageType page_type) __attribute__((always_inline))
Definition: page.hpp:284
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
std::atomic< bool > * me_waiting()
void init(storage::StorageId dummy_storage_id, uint16_t node_id, uint32_t in_node_index)
Analogous to one thread-group/socket/node.
std::atomic< bool > * other_waiting(thread::ThreadId id)
Typedefs of ID types used in thread package.
std::vector< McsWwBlock > mcs_ww_blocks_
uint16_t numa_node_count_
number of NUMA nodes in this engine.
McsBlockIndex get_other_cur_block(thread::ThreadId id)
McsRwExtendedBlock * get_rw_other_async_block(thread::ThreadId id, xct::McsRwLock *lock)
Dereference other thread's block index for extended rwlock.
Defines an adapter template interface for our MCS lock classes.
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:139
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
RW_BLOCK * get_rw_other_block(uint32_t block_int)
PagePoolOffset begin_
where a valid page entry starts.
constexpr uint32_t kMcsMockDataPageLocksPerPage
constexpr uint32_t kMcsMockDataPageHeaderPad
Implements McsAdaptorConcept.
void add_rw_async_mapping(xct::McsRwLock *lock, xct::McsBlockIndex block_index)
std::atomic< bool > * other_waiting(thread::ThreadId id)
Returns the bool var on whether other thread is waiting for some lock.
PagePoolOffset end_
where a valid page entry ends.
McsWwBlock * get_ww_other_block(thread::ThreadId id, McsBlockIndex index)
void init(storage::StorageId dummy_storage_id, uint16_t node_id, uint32_t threads_per_node, uint32_t max_block_count, uint32_t pages_per_node)
McsWwBlock * get_ww_my_block(McsBlockIndex index)
Dereference my block index for exclusive locks.
McsBlockIndex get_tail_waiter_block() const
Definition: xct_id.hpp:817
void cancel_new_block(McsBlockIndex the_block)
UniversalLockId rw_lock_to_universal_lock_id(const memory::GlobalVolatilePageResolver &resolver, McsRwLock *lock)
Definition: xct_id.hpp:1231
McsBlockIndex get_cur_block() const
void add_rw_async_mapping(xct::McsRwLock *lock, xct::McsBlockIndex block_index)
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
std::vector< xct::McsRwAsyncMapping > mcs_rw_async_mappings_
char filler_[kMcsMockDataPageFiller]
thread::ThreadGroupId get_my_numa_node() const
Returns group-Id of this thread.
McsMockThread< RW_BLOCK > * get_other_thread(thread::ThreadId id)
Pre-allocated MCS block for WW-locks.
Definition: xct_id.hpp:274
Definitions of IDs in this package and a few related constant values.
An exclusive-only (WW) MCS lock data structure.
Definition: xct_id.hpp:324
Pre-allocated MCS block for extended version of RW-locks.
Definition: xct_id.hpp:513
RW_BLOCK * get_rw_my_block(McsBlockIndex index)
Dereference my block index for reader-writer locks.
void reset() __attribute__((always_inline))
used only while page initialization
Definition: xct_id.hpp:1150
McsMockThread(McsMockThread &&rhs)
RwLockableXctId * get_rw_lock_address(uint16_t node_id, uint64_t lock_index)
Just a marker to denote that a memory region represents a data page.
Definition: page.hpp:184
constexpr uint32_t kMcsMockDataPageFiller
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
constexpr uint32_t kMcsMockDataPageHeaderSize
McsWwLock ww_[kMcsMockDataPageLocksPerPage]
void set(uint8_t numa_node, memory::PagePoolOffset offset)
Definition: storage_id.hpp:212
std::vector< McsMockNode< RW_BLOCK > > nodes_
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
thread::ThreadId get_my_id() const
Returns thread-Id of this thread.
void * get_block() const
Returns the memory block.
A dummy implementation that provides McsAdaptorConcept for testing.
void init(uint32_t max_block_count)
std::vector< McsMockThread< RW_BLOCK > > threads_
void init(storage::StorageId dummy_storage_id, uint32_t nodes, uint32_t threads_per_node, uint32_t max_block_count, uint32_t max_lock_count)
McsMockDataPage * pages_
Locks assigned to this node are stored in these memory.
uint32_t McsBlockIndex
Index in thread-local MCS block.
Definition: xct_id.hpp:153
RwLockableXctId tid_[kMcsMockDataPageLocksPerPage]
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
Represents one memory block aligned to actual OS/hardware pages.
McsWwBlock * get_ww_my_block(McsBlockIndex index)
thread::ThreadId get_my_id() const
McsBlockIndex get_cur_block() const
A dummy page layout to store RwLockableXctId.
RW_BLOCK * get_rw_other_block(thread::ThreadId id, McsBlockIndex index)
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
McsWwBlock * get_ww_other_block(thread::ThreadId id, McsBlockIndex index)
Dereference other thread's block index for exclusive locks.
xct::McsBlockIndex get_mcs_rw_async_block_index(const memory::GlobalVolatilePageResolver &resolver, xct::McsRwLock *lock)
Analogous to the entire engine.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
thread::ThreadGroupId get_my_numa_node() const
McsBlockIndex get_other_cur_block(thread::ThreadId id)
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
RW_BLOCK * get_rw_my_block(McsBlockIndex index)
void alloc_onnode(uint64_t size, uint64_t alignment, int numa_node) noexcept
Short for alloc(kNumaAllocOnnode)
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
Base bases_[kMaxNumaNode]
base address to calculate from/to offset.
McsWwLock * get_ww_lock_address(uint16_t node_id, uint64_t lock_index)
An MCS reader-writer lock data structure.
Definition: xct_id.hpp:795
McsBlockIndex issue_new_block()
Issues a new queue node of this thread and returns its block index.
McsRwExtendedBlock * get_rw_other_async_block(thread::ThreadId id, xct::McsRwLock *lock)
McsMockAdaptor(thread::ThreadId id, McsMockContext< RW_BLOCK > *context)
std::atomic< bool > * me_waiting()
Returns the atomic bool var on whether current thread is waiting for some lock.
const UniversalLockId kNullUniversalLockId
This never points to a valid lock, and also evaluates less than any vaild alocks. ...
Definition: xct_id.hpp:137
thread::ThreadId get_tail_waiter() const
Definition: xct_id.hpp:818
RW_BLOCK * dereference_rw_tail_block(uint32_t tail_int)
bool is_null() const
Returns if this object doesn't hold a valid memory block.