libfoedus-core
FOEDUS Core Library
thread_pimpl.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_THREAD_THREAD_PIMPL_HPP_
19 #define FOEDUS_THREAD_THREAD_PIMPL_HPP_
20 
21 #include <atomic>
22 #include <thread>
23 
25 #include "foedus/initializable.hpp"
26 #include "foedus/cache/fwd.hpp"
29 #include "foedus/memory/fwd.hpp"
31 #include "foedus/proc/proc_id.hpp"
34 #include "foedus/storage/fwd.hpp"
36 #include "foedus/thread/fwd.hpp"
39 #include "foedus/xct/fwd.hpp"
41 #include "foedus/xct/xct.hpp"
42 #include "foedus/xct/xct_id.hpp"
43 
44 namespace foedus {
45 namespace thread {
46 
49  // this is backed by shared memory. not instantiation. just reinterpret_cast.
50  ThreadControlBlock() = delete;
51  ~ThreadControlBlock() = delete;
52 
53  void initialize(ThreadId my_thread_id) {
57  mcs_waiting_.store(false);
58  current_ticket_ = 0;
59  proc_name_.clear();
60  input_len_ = 0;
61  output_len_ = 0;
67  my_thread_id_ = my_thread_id;
70  }
71  void uninitialize() {
73  }
74 
85 
88 
97  std::atomic<bool> mcs_waiting_;
98 
105 
111 
114 
121 
124 
126  uint32_t input_len_;
127 
129  uint32_t output_len_;
130 
133 
138 
141 
144 
147 };
148 
159 class ThreadPimpl final : public DefaultInitializable {
160  public:
161  template<typename RW_BLOCK> friend class ThreadPimplMcsAdaptor;
162 
163  ThreadPimpl() = delete;
164  ThreadPimpl(
165  Engine* engine,
166  Thread* holder,
167  ThreadId id,
168  ThreadGlobalOrdinal global_ordinal);
169  ErrorStack initialize_once() override final;
170  ErrorStack uninitialize_once() override final;
171 
178  void handle_tasks();
180  void set_thread_schedule();
181  bool is_stop_requested() const;
182 
185  storage::SnapshotPagePointer page_id,
186  storage::Page** out);
189  uint16_t batch_size,
190  const storage::SnapshotPagePointer* page_ids,
191  storage::Page** out);
192 
195  storage::SnapshotPagePointer page_id,
196  storage::Page* buffer) ALWAYS_INLINE;
199  storage::SnapshotPagePointer page_id_begin,
200  uint32_t page_count,
201  storage::Page* buffer) ALWAYS_INLINE;
202 
205  storage::DualPagePointer* pointer,
206  storage::Page** installed_page);
207 
210  storage::VolatilePageInit page_initializer,
211  bool tolerate_null_pointer,
212  bool will_modify,
213  bool take_ptr_set_snapshot,
214  storage::DualPagePointer* pointer,
215  storage::Page** page,
216  const storage::Page* parent,
217  uint16_t index_in_parent);
220  uint16_t batch_size,
221  storage::VolatilePageInit page_initializer,
222  bool tolerate_null_pointer,
223  bool take_ptr_set_snapshot,
224  storage::DualPagePointer** pointers,
225  storage::Page** parents,
226  const uint16_t* index_in_parents,
227  bool* followed_snapshots,
228  storage::Page** out);
231  uint16_t batch_size,
232  storage::VolatilePageInit page_initializer,
233  storage::DualPagePointer** pointers,
234  storage::Page** parents,
235  const uint16_t* index_in_parents,
236  storage::Page** out);
238  storage::SnapshotPagePointer page_id,
239  memory::PagePoolOffset* pool_offset);
240 
251  storage::Page* place_a_new_volatile_page(
252  memory::PagePoolOffset new_offset,
253  storage::DualPagePointer* pointer);
254 
255 
257  void collect_retired_volatile_page(storage::VolatilePagePointer ptr);
258 
265  uint16_t node,
266  Epoch current_epoch,
267  memory::PagePoolOffsetAndEpochChunk* chunk);
268 
270  bool is_volatile_page_retired(storage::VolatilePagePointer ptr);
271 
274 
275 
280  template<typename FUNC>
281  void switch_mcs_impl(FUNC func);
282  bool is_simple_mcs_rw() const { return simple_mcs_rw_; }
283 
286 
289  void cll_release_all_locks();
291 
292 
295  ErrorCode run_nested_sysxct(xct::SysxctFunctor* functor, uint32_t max_retries);
297  xct::SysxctWorkspace* sysxct_workspace,
299  xct::RwLockableXctId* lock);
301  xct::SysxctWorkspace* sysxct_workspace,
303  uint32_t lock_count,
304  xct::RwLockableXctId** locks);
306  xct::SysxctWorkspace* sysxct_workspace,
307  storage::Page* page);
309  xct::SysxctWorkspace* sysxct_workspace,
310  uint32_t lock_count,
311  storage::Page** pages);
312 
313  // overload to be template-friendly
318 
319 
320  Engine* const engine_;
321 
325  Thread* const holder_;
326 
330  const ThreadId id_;
331 
334 
339 
352 
357 
362 
366  std::thread raw_thread_;
368  std::atomic<bool> raw_thread_set_;
369 
376 
381 
385 
391 
393 };
394 
403 template<typename RW_BLOCK>
404 class ThreadPimplMcsAdaptor {
405  public:
406  typedef RW_BLOCK ThisRwBlock;
407 
408  explicit ThreadPimplMcsAdaptor(ThreadPimpl* pimpl) : pimpl_(pimpl) {}
410 
412  // if this assertion fires, probably we are retrying something too many times
413  ASSERT_ND(pimpl_->control_block_->mcs_block_current_ < 0xFFFFU);
414  return ++pimpl_->control_block_->mcs_block_current_;
415  }
417  ASSERT_ND(pimpl_->control_block_->mcs_block_current_ == the_block);
419  }
421  ThreadId get_my_id() const { return pimpl_->id_; }
422  ThreadGroupId get_my_numa_node() const { return pimpl_->numa_node_; }
423  std::atomic<bool>* me_waiting() { return &pimpl_->control_block_->mcs_waiting_; }
424 
426  ASSERT_ND(index > 0);
427  ASSERT_ND(index < 0xFFFFU);
428  ASSERT_ND(index <= pimpl_->control_block_->mcs_block_current_);
429  return pimpl_->mcs_ww_blocks_ + index;
430  }
432  ASSERT_ND(index > 0);
433  ASSERT_ND(index < 0xFFFFU);
434  ASSERT_ND(index <= pimpl_->control_block_->mcs_block_current_);
435  RW_BLOCK* ret;
436  pimpl_->get_mcs_rw_my_blocks(&ret);
437  ret += index;
438  return ret;
439  }
440 
441  std::atomic<bool>* other_waiting(ThreadId id) {
442  ThreadRef other = pimpl_->get_thread_ref(id);
443  return &(other.get_control_block()->mcs_waiting_);
444  }
446  ThreadRef other = pimpl_->get_thread_ref(id);
447  return other.get_control_block()->mcs_block_current_;
448  }
450  ThreadRef other = pimpl_->get_thread_ref(id);
452  return other.get_mcs_ww_blocks() + index;
453  }
455  ASSERT_ND(index > 0);
456  ASSERT_ND(index < 0xFFFFU);
457  ThreadRef other = pimpl_->get_thread_ref(id);
458  RW_BLOCK* ret;
459  other.get_mcs_rw_blocks(&ret);
461  ret += index;
462  return ret;
463  }
464  RW_BLOCK* dereference_rw_tail_block(uint32_t tail_int) {
465  xct::McsRwLock tail_tmp;
466  tail_tmp.tail_ = tail_int;
467  uint32_t tail_id = tail_tmp.get_tail_waiter();
468  uint32_t tail_block = tail_tmp.get_tail_waiter_block();
469  return get_rw_other_block(tail_id, tail_block);
470  }
472  ASSERT_ND(id != get_my_id());
473  ThreadRef other = pimpl_->get_thread_ref(id);
476  auto* mapping = other.get_mcs_rw_async_mapping(lock_id);
477  ASSERT_ND(mapping);
478  ASSERT_ND(mapping->lock_id_ == lock_id);
479  return get_rw_other_block(id, mapping->block_index_);
480  }
482  // TLS, no concurrency control needed
483  auto index = pimpl_->control_block_->mcs_rw_async_mapping_current_;
484  ASSERT_ND(index <= pimpl_->control_block_->mcs_block_current_);
486  ASSERT_ND(pimpl_->mcs_rw_async_mappings_[index].block_index_ == 0);
487  pimpl_->mcs_rw_async_mappings_[index].lock_id_ =
489  pimpl_->mcs_rw_async_mappings_[index].block_index_ = block_index;
491  }
493  xct::UniversalLockId lock_id =
496  for (uint32_t i = 0; i < pimpl_->control_block_->mcs_rw_async_mapping_current_; ++i) {
497  if (pimpl_->mcs_rw_async_mappings_[i].lock_id_ == lock_id) {
500  pimpl_->mcs_rw_async_mappings_[i].block_index_= 0;
501  return;
502  }
503  }
504  ASSERT_ND(false);
505  }
506 
507  private:
508  ThreadPimpl* const pimpl_;
509 };
510 
511 
514  storage::Page* buffer) {
515  return snapshot_file_set_.read_page(page_id, buffer);
516 }
518  storage::SnapshotPagePointer page_id_begin,
519  uint32_t page_count,
520  storage::Page* buffer) {
521  return snapshot_file_set_.read_pages(page_id_begin, page_count, buffer);
522 }
523 
524 } // namespace thread
525 } // namespace foedus
526 #endif // FOEDUS_THREAD_THREAD_PIMPL_HPP_
void initialize(ThreadId my_thread_id)
void collect_retired_volatile_page(storage::VolatilePagePointer ptr)
Keeps the specified volatile page as retired as of the current epoch.
ErrorCode sysxct_page_lock(xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
ThreadId my_thread_id_
Used only for sanity check.
Implements McsAdaptorConcept over ThreadPimpl.
Definition: fwd.hpp:38
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
ErrorStack uninitialize_once() override final
ThreadTicket current_ticket_
The most recently issued impersonation ticket.
Definitions of IDs in this package and a few related constant values.
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
A thread-local log buffer.
ThreadGroupId get_my_numa_node() const
ThreadStatus
Impersonation status of each worker thread.
Definition: thread_id.hpp:203
uint32_t mcs_block_current_
How many MCS blocks we allocated in this thread's current xct.
void set_thread_schedule()
initializes the thread's policy/priority
ErrorCode cll_try_or_acquire_multiple_locks(xct::LockListPosition upto_pos)
uint32_t input_len_
Byte size of input given to the procedure.
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
Definition: page_pool.hpp:173
void initialize(bool recursive=false)
std::atomic< bool > mcs_waiting_
Whether this thread is waiting for some MCS lock.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
memory::NumaNodeMemory * node_memory_
same above
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
Typedefs of ID types used in thread package.
std::thread raw_thread_
Encapsulates raw thread object.
xct::McsRwAsyncMapping * get_mcs_rw_async_mapping(xct::UniversalLockId lock_id)
Definition: thread_ref.cpp:149
Shared data of ThreadPimpl.
soc::SharedPolling task_complete_cond_
When the current task has been completed, the thread signals this.
Forward declarations of classes in transaction package.
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Represents a user transaction.
Definition: xct.hpp:58
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
const ThreadGlobalOrdinal global_ordinal_
globally and contiguously numbered ID of thread
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
void cll_giveup_all_locks_after(xct::UniversalLockId address)
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
soc::SharedMutex task_mutex_
The following variables are protected by this mutex.
Reader-writer (RW) MCS lock classes.
Definition: xct_id.hpp:387
Represents a time epoch.
Definition: epoch.hpp:61
void switch_mcs_impl(FUNC func)
MCS locks methods.
A polling-wait mechanism that can be placed in shared memory and used from multiple processes...
Typical implementation of Initializable as a skeleton base class.
Holds a set of read-only file objects for snapshot files.
xct::McsBlockIndex get_other_cur_block(ThreadId id)
Forward declarations of classes in cache package.
ErrorCode sysxct_batch_page_locks(xct::SysxctWorkspace *sysxct_workspace, uint32_t lock_count, storage::Page **pages)
McsBlockIndex get_tail_waiter_block() const
Definition: xct_id.hpp:817
std::atomic< bool > raw_thread_set_
Just to make sure raw_thread_ is set.
RW_BLOCK * get_rw_other_block(ThreadId id, xct::McsBlockIndex index)
Repository of memories dynamically acquired within one CPU core (thread).
UniversalLockId rw_lock_to_universal_lock_id(const memory::GlobalVolatilePageResolver &resolver, McsRwLock *lock)
Definition: xct_id.hpp:1231
Pimpl object of Thread.
xct::McsWwBlock * mcs_ww_blocks_
Pre-allocated MCS blocks.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
Definition: xct_id.hpp:134
A mutex that can be placed in shared memory and used from multiple processes.
void get_mcs_rw_blocks(xct::McsRwSimpleBlock **out) const
Definition: thread_ref.hpp:68
soc::SharedPolling wakeup_cond_
The thread sleeps on this conditional when it has no task.
std::atomic< bool > * other_waiting(ThreadId id)
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
xct::McsWwBlock * get_ww_my_block(xct::McsBlockIndex index)
bool is_volatile_page_retired(storage::VolatilePagePointer ptr)
Subroutine of collect_retired_volatile_page() just for assertion.
A functor representing the logic in a system transaction via virtual-function.
Forward declarations of classes in storage package.
ErrorCode find_or_read_snapshot_pages_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
Batched version of find_or_read_a_snapshot_page().
Pre-allocated MCS block for WW-locks.
Definition: xct_id.hpp:274
Definitions of IDs in this package and a few related constant values.
const ThreadGroupId numa_node_
Node this thread belongs to.
void remove_rw_async_mapping(xct::McsRwLock *lock)
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
void flush_retired_volatile_page(uint16_t node, Epoch current_epoch, memory::PagePoolOffsetAndEpochChunk *chunk)
Subroutine of collect_retired_volatile_page() in case the chunk becomes full.
Engine *const engine_
MCS locks methods.
ErrorCode sysxct_record_lock(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
Definition: xct_id.hpp:148
void cll_release_all_locks_after(xct::UniversalLockId address)
RW-locks.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
Pre-allocated MCS block for extended version of RW-locks.
Definition: xct_id.hpp:513
Thread *const holder_
The public object that holds this pimpl object.
RW_BLOCK * get_rw_my_block(xct::McsBlockIndex index)
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint32_t mcs_rw_async_mapping_current_
How many async mappings for extended RW lock we have so far.
Repository of memories dynamically acquired and shared within one NUMA node (socket).
ErrorCode sysxct_batch_record_locks(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, uint32_t lock_count, xct::RwLockableXctId **locks)
Typedefs of ID types used in procedure package.
A view of Thread object for other SOCs and master engine.
Definition: thread_ref.hpp:39
ThreadControlBlock * get_control_block() const
Definition: thread_ref.hpp:71
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
A NUMA-local hashtable of cached snapshot pages.
Representation of ErrorStack that can be copied to other processes and even serialized to files...
xct::McsWwBlock * get_ww_other_block(ThreadId id, xct::McsBlockIndex index)
ErrorCode read_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, void *out)
Read contiguous pages in one shot.
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries)
Sysxct-related.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
void handle_tasks()
Main routine of the worker thread.
void add_rw_async_mapping(xct::McsRwLock *lock, xct::McsBlockIndex block_index)
proc::ProcName proc_name_
Name of the procedure to execute next.
xct::RwLockableXctId * canonical_address_
Forward declarations of classes in memory package.
xct::McsRwSimpleBlock * mcs_rw_simple_blocks_
ErrorStack initialize_once() override final
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
uint16_t ThreadGlobalOrdinal
Typedef for a globally and contiguously numbered ID of thread.
Definition: thread_id.hpp:98
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
uint32_t McsBlockIndex
Index in thread-local MCS block.
Definition: xct_id.hpp:153
const Epoch INVALID_EPOCH
A constant epoch object that represents an invalid epoch.
Definition: epoch.hpp:204
void get_mcs_rw_my_blocks(xct::McsRwExtendedBlock **out)
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
void get_mcs_rw_my_blocks(xct::McsRwSimpleBlock **out)
ThreadStatus status_
Impersonation status of this thread.
Resolves an offset in local (same NUMA node) page pool to a pointer and vice versa.
xct::McsRwExtendedBlock * mcs_rw_extended_blocks_
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
void(* VolatilePageInit)(const VolatilePageInitArguments &args)
A function pointer to initialize a volatile page.
Definition: page.hpp:387
ErrorCode follow_page_pointers_for_read_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
Batched version of follow_page_pointer with will_modify==false.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
xct::McsRwAsyncMapping * mcs_rw_async_mappings_
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
void clear() noexcept
Clear string.
ThreadRef get_thread_ref(ThreadId id)
uint32_t output_len_
Byte size of output as the result of the procedure.
xct::UniversalLockId cll_get_max_locked_id() const
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void cancel_new_block(xct::McsBlockIndex the_block)
FixedErrorStack proc_result_
Error code as the result of the procedure.
ErrorCode on_snapshot_cache_miss(storage::SnapshotPagePointer page_id, memory::PagePoolOffset *pool_offset)
Forward declarations of classes in thread package.
xct::McsRwExtendedBlock * get_rw_other_async_block(ThreadId id, xct::McsRwLock *lock)
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106
bool simple_mcs_rw_
shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
xct::McsWwBlock * get_mcs_ww_blocks() const
Definition: thread_ref.hpp:64
std::atomic< bool > * me_waiting()
An MCS reader-writer lock data structure.
Definition: xct_id.hpp:795
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
Per-thread reused work memory for system transactions.
ErrorCode read_snapshot_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, storage::Page *buffer) __attribute__((always_inline))
Read contiguous pages in one shot.
xct::McsBlockIndex get_cur_block() const
cache::CacheHashtable * snapshot_cache_hashtable_
same above
UniversalLockId lock_id_
Definition: xct_id.hpp:874
const UniversalLockId kNullUniversalLockId
This never points to a valid lock, and also evaluates less than any vaild alocks. ...
Definition: xct_id.hpp:137
RW_BLOCK * dereference_rw_tail_block(uint32_t tail_int)
ErrorCode follow_page_pointers_for_write_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.
uint64_t ThreadTicket
Typedef for a monotonically increasing ticket for thread impersonation.
Definition: thread_id.hpp:117
thread::ThreadId get_tail_waiter() const
Definition: xct_id.hpp:818
ThreadControlBlock * control_block_