libfoedus-core
FOEDUS Core Library
foedus::thread::ThreadPimpl Class Referencefinal

Pimpl object of Thread. More...

Detailed Description

Pimpl object of Thread.

A private pimpl object for Thread. Do not include this header from a client program unless you know what you are doing.

Especially, this class heavily uses C++11 classes, which is why we separate this class from Thread. Be aware of notices in C++11 Keywords in Public Headers unless your client program allows C++11.

Definition at line 159 of file thread_pimpl.hpp.

#include <thread_pimpl.hpp>

Inheritance diagram for foedus::thread::ThreadPimpl:
Collaboration diagram for foedus::thread::ThreadPimpl:

Public Member Functions

 ThreadPimpl ()=delete
 
 ThreadPimpl (Engine *engine, Thread *holder, ThreadId id, ThreadGlobalOrdinal global_ordinal)
 
ErrorStack initialize_once () override final
 
ErrorStack uninitialize_once () override final
 
void handle_tasks ()
 Main routine of the worker thread. More...
 
void set_thread_schedule ()
 initializes the thread's policy/priority More...
 
bool is_stop_requested () const
 
ErrorCode find_or_read_a_snapshot_page (storage::SnapshotPagePointer page_id, storage::Page **out)
 Find the given page in snapshot cache, reading it if not found. More...
 
ErrorCode find_or_read_snapshot_pages_batch (uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
 Batched version of find_or_read_a_snapshot_page(). More...
 
ErrorCode read_a_snapshot_page (storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
 Read a snapshot page using the thread-local file descriptor set. More...
 
ErrorCode read_snapshot_pages (storage::SnapshotPagePointer page_id_begin, uint32_t page_count, storage::Page *buffer) __attribute__((always_inline))
 Read contiguous pages in one shot. More...
 
ErrorCode install_a_volatile_page (storage::DualPagePointer *pointer, storage::Page **installed_page)
 Installs a volatile page to the given dual pointer as a copy of the snapshot page. More...
 
ErrorCode follow_page_pointer (storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
 A general method to follow (read) a page pointer. More...
 
ErrorCode follow_page_pointers_for_read_batch (uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
 Batched version of follow_page_pointer with will_modify==false. More...
 
ErrorCode follow_page_pointers_for_write_batch (uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
 Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true. More...
 
ErrorCode on_snapshot_cache_miss (storage::SnapshotPagePointer page_id, memory::PagePoolOffset *pool_offset)
 
storage::Pageplace_a_new_volatile_page (memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
 Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new volatile page created by this thread. More...
 
void collect_retired_volatile_page (storage::VolatilePagePointer ptr)
 Keeps the specified volatile page as retired as of the current epoch. More...
 
void flush_retired_volatile_page (uint16_t node, Epoch current_epoch, memory::PagePoolOffsetAndEpochChunk *chunk)
 Subroutine of collect_retired_volatile_page() in case the chunk becomes full. More...
 
bool is_volatile_page_retired (storage::VolatilePagePointer ptr)
 Subroutine of collect_retired_volatile_page() just for assertion. More...
 
ThreadRef get_thread_ref (ThreadId id)
 
template<typename FUNC >
void switch_mcs_impl (FUNC func)
 MCS locks methods. More...
 
bool is_simple_mcs_rw () const
 
void cll_release_all_locks_after (xct::UniversalLockId address)
 RW-locks. More...
 
void cll_giveup_all_locks_after (xct::UniversalLockId address)
 
ErrorCode cll_try_or_acquire_single_lock (xct::LockListPosition pos)
 
ErrorCode cll_try_or_acquire_multiple_locks (xct::LockListPosition upto_pos)
 
void cll_release_all_locks ()
 
xct::UniversalLockId cll_get_max_locked_id () const
 
ErrorCode run_nested_sysxct (xct::SysxctFunctor *functor, uint32_t max_retries)
 Sysxct-related. More...
 
ErrorCode sysxct_record_lock (xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
 
ErrorCode sysxct_batch_record_locks (xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, uint32_t lock_count, xct::RwLockableXctId **locks)
 
ErrorCode sysxct_page_lock (xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
 
ErrorCode sysxct_batch_page_locks (xct::SysxctWorkspace *sysxct_workspace, uint32_t lock_count, storage::Page **pages)
 
void get_mcs_rw_my_blocks (xct::McsRwSimpleBlock **out)
 
void get_mcs_rw_my_blocks (xct::McsRwExtendedBlock **out)
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Public Attributes

Engine *const engine_
 MCS locks methods. More...
 
Thread *const holder_
 The public object that holds this pimpl object. More...
 
const ThreadId id_
 Unique ID of this thread. More...
 
const ThreadGroupId numa_node_
 Node this thread belongs to. More...
 
const ThreadGlobalOrdinal global_ordinal_
 globally and contiguously numbered ID of thread More...
 
bool simple_mcs_rw_
 shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple More...
 
memory::NumaCoreMemorycore_memory_
 Private memory repository of this thread. More...
 
memory::NumaNodeMemorynode_memory_
 same above More...
 
cache::CacheHashtablesnapshot_cache_hashtable_
 same above More...
 
memory::PagePoolsnapshot_page_pool_
 shorthand for node_memory_->get_snapshot_pool() More...
 
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
 Page resolver to convert all page ID to page pointer. More...
 
memory::LocalPageResolver local_volatile_page_resolver_
 Page resolver to convert only local page ID to page pointer. More...
 
log::ThreadLogBuffer log_buffer_
 Thread-private log buffer. More...
 
std::thread raw_thread_
 Encapsulates raw thread object. More...
 
std::atomic< bool > raw_thread_set_
 Just to make sure raw_thread_ is set. More...
 
xct::Xct current_xct_
 Current transaction this thread is conveying. More...
 
cache::SnapshotFileSet snapshot_file_set_
 Each threads maintains a private set of snapshot file descriptors. More...
 
ThreadControlBlockcontrol_block_
 
void * task_input_memory_
 
void * task_output_memory_
 
xct::McsWwBlockmcs_ww_blocks_
 Pre-allocated MCS blocks. More...
 
xct::McsRwSimpleBlockmcs_rw_simple_blocks_
 
xct::McsRwExtendedBlockmcs_rw_extended_blocks_
 
xct::McsRwAsyncMappingmcs_rw_async_mappings_
 
xct::RwLockableXctIdcanonical_address_
 

Friends

template<typename RW_BLOCK >
class ThreadPimplMcsAdaptor
 

Constructor & Destructor Documentation

foedus::thread::ThreadPimpl::ThreadPimpl ( )
delete
foedus::thread::ThreadPimpl::ThreadPimpl ( Engine engine,
Thread holder,
ThreadId  id,
ThreadGlobalOrdinal  global_ordinal 
)

Definition at line 55 of file thread_pimpl.cpp.

60  : engine_(engine),
61  holder_(holder),
62  id_(id),
64  global_ordinal_(global_ordinal),
65  core_memory_(nullptr),
66  node_memory_(nullptr),
68  snapshot_page_pool_(nullptr),
69  log_buffer_(engine, id),
70  current_xct_(engine, holder, id),
71  snapshot_file_set_(engine),
72  control_block_(nullptr),
73  task_input_memory_(nullptr),
74  task_output_memory_(nullptr),
75  mcs_ww_blocks_(nullptr),
76  mcs_rw_simple_blocks_(nullptr),
77  mcs_rw_extended_blocks_(nullptr),
78  canonical_address_(nullptr) {
79 }
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
memory::NumaNodeMemory * node_memory_
same above
const ThreadGlobalOrdinal global_ordinal_
globally and contiguously numbered ID of thread
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
xct::McsWwBlock * mcs_ww_blocks_
Pre-allocated MCS blocks.
const ThreadGroupId numa_node_
Node this thread belongs to.
Engine *const engine_
MCS locks methods.
Thread *const holder_
The public object that holds this pimpl object.
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
xct::RwLockableXctId * canonical_address_
xct::McsRwSimpleBlock * mcs_rw_simple_blocks_
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
xct::McsRwExtendedBlock * mcs_rw_extended_blocks_
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
cache::CacheHashtable * snapshot_cache_hashtable_
same above
ThreadControlBlock * control_block_

Member Function Documentation

xct::UniversalLockId foedus::thread::ThreadPimpl::cll_get_max_locked_id ( ) const

Definition at line 930 of file thread_pimpl.cpp.

References current_xct_, foedus::xct::Xct::get_current_lock_list(), and foedus::xct::CurrentLockList::get_max_locked_id().

Referenced by run_nested_sysxct().

930  {
931  const xct::CurrentLockList* cll = current_xct_.get_current_lock_list();
932  return cll->get_max_locked_id();
933 }
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
UniversalLockId get_max_locked_id() const
xct::Xct current_xct_
Current transaction this thread is conveying.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::cll_giveup_all_locks_after ( xct::UniversalLockId  address)

Definition at line 887 of file thread_pimpl.cpp.

References current_xct_, foedus::xct::Xct::get_current_lock_list(), foedus::xct::CurrentLockList::giveup_all_after(), and is_simple_mcs_rw().

Referenced by foedus::thread::Thread::cll_giveup_all_locks_after().

887  {
888  xct::CurrentLockList* cll = current_xct_.get_current_lock_list();
889  if (is_simple_mcs_rw()) {
890  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
891  cll->giveup_all_after(address, &impl);
892  } else {
893  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
894  cll->giveup_all_after(address, &impl);
895  }
896 }
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
xct::Xct current_xct_
Current transaction this thread is conveying.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::cll_release_all_locks ( )

Definition at line 919 of file thread_pimpl.cpp.

References current_xct_, foedus::xct::Xct::get_current_lock_list(), is_simple_mcs_rw(), and foedus::xct::CurrentLockList::release_all_locks().

Referenced by foedus::thread::Thread::cll_release_all_locks(), and foedus::thread::ThreadPimplCllReleaseAllFunctor::operator()().

919  {
920  xct::CurrentLockList* cll = current_xct_.get_current_lock_list();
921  if (is_simple_mcs_rw()) {
922  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
923  return cll->release_all_locks(&impl);
924  } else {
925  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
926  return cll->release_all_locks(&impl);
927  }
928 }
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
xct::Xct current_xct_
Current transaction this thread is conveying.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::cll_release_all_locks_after ( xct::UniversalLockId  address)

RW-locks.

These switch implementations. we could shorten it if we assume C++14 (lambda with auto), but not much. Doesn't matter.

Definition at line 876 of file thread_pimpl.cpp.

References current_xct_, foedus::xct::Xct::get_current_lock_list(), is_simple_mcs_rw(), and foedus::xct::CurrentLockList::release_all_after().

Referenced by foedus::thread::Thread::cll_release_all_locks_after().

876  {
877  xct::CurrentLockList* cll = current_xct_.get_current_lock_list();
878  if (is_simple_mcs_rw()) {
879  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
880  cll->release_all_after(address, &impl);
881  } else {
882  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
883  cll->release_all_after(address, &impl);
884  }
885 }
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
xct::Xct current_xct_
Current transaction this thread is conveying.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::cll_try_or_acquire_multiple_locks ( xct::LockListPosition  upto_pos)

Definition at line 909 of file thread_pimpl.cpp.

References current_xct_, foedus::xct::Xct::get_current_lock_list(), is_simple_mcs_rw(), and foedus::xct::CurrentLockList::try_or_acquire_multiple_locks().

Referenced by foedus::thread::Thread::cll_try_or_acquire_multiple_locks().

909  {
910  xct::CurrentLockList* cll = current_xct_.get_current_lock_list();
911  if (is_simple_mcs_rw()) {
912  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
913  return cll->try_or_acquire_multiple_locks(upto_pos, &impl);
914  } else {
915  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
916  return cll->try_or_acquire_multiple_locks(upto_pos, &impl);
917  }
918 }
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
xct::Xct current_xct_
Current transaction this thread is conveying.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::cll_try_or_acquire_single_lock ( xct::LockListPosition  pos)

Definition at line 898 of file thread_pimpl.cpp.

References current_xct_, foedus::xct::Xct::get_current_lock_list(), is_simple_mcs_rw(), and foedus::xct::CurrentLockList::try_or_acquire_single_lock().

Referenced by foedus::thread::Thread::cll_try_or_acquire_single_lock().

898  {
899  xct::CurrentLockList* cll = current_xct_.get_current_lock_list();
900  if (is_simple_mcs_rw()) {
901  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
902  return cll->try_or_acquire_single_lock(pos, &impl);
903  } else {
904  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
905  return cll->try_or_acquire_single_lock(pos, &impl);
906  }
907 }
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
xct::Xct current_xct_
Current transaction this thread is conveying.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::collect_retired_volatile_page ( storage::VolatilePagePointer  ptr)

Keeps the specified volatile page as retired as of the current epoch.

Retired page handling methods.

Parameters
[in]ptrthe volatile page that has been retired
Precondition
in the page ptr points to, is_retired()==true.

This thread buffers such pages and returns to volatile page pool when it is safe to do so.

Definition at line 777 of file thread_pimpl.cpp.

References ASSERT_ND, core_memory_, engine_, flush_retired_volatile_page(), foedus::memory::PagePoolOffsetAndEpochChunk::full(), foedus::xct::XctManager::get_current_global_epoch_weak(), foedus::storage::VolatilePagePointer::get_numa_node(), foedus::storage::VolatilePagePointer::get_offset(), foedus::memory::NumaCoreMemory::get_retired_volatile_pool_chunk(), foedus::Engine::get_xct_manager(), is_volatile_page_retired(), foedus::Epoch::one_more(), and foedus::memory::PagePoolOffsetAndEpochChunk::push_back().

Referenced by foedus::thread::Thread::collect_retired_volatile_page().

777  {
779  uint16_t node = ptr.get_numa_node();
780  Epoch current_epoch = engine_->get_xct_manager()->get_current_global_epoch_weak();
781  Epoch safe_epoch = current_epoch.one_more().one_more();
782  memory::PagePoolOffsetAndEpochChunk* chunk = core_memory_->get_retired_volatile_pool_chunk(node);
783  if (chunk->full()) {
784  flush_retired_volatile_page(node, current_epoch, chunk);
785  }
786  chunk->push_back(ptr.get_offset(), safe_epoch);
787 }
Epoch get_current_global_epoch_weak() const
bool is_volatile_page_retired(storage::VolatilePagePointer ptr)
Subroutine of collect_retired_volatile_page() just for assertion.
void flush_retired_volatile_page(uint16_t node, Epoch current_epoch, memory::PagePoolOffsetAndEpochChunk *chunk)
Subroutine of collect_retired_volatile_page() in case the chunk becomes full.
Engine *const engine_
MCS locks methods.
Epoch one_more() const
Definition: epoch.hpp:127
PagePoolOffsetAndEpochChunk * get_retired_volatile_pool_chunk(uint16_t node)
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::find_or_read_a_snapshot_page ( storage::SnapshotPagePointer  page_id,
storage::Page **  out 
)

Find the given page in snapshot cache, reading it if not found.

Definition at line 647 of file thread_pimpl.cpp.

References foedus::xct::Xct::acquire_local_work_memory(), ASSERT_ND, foedus::EngineOptions::cache_, CHECK_ERROR_CODE, control_block_, current_xct_, engine_, foedus::cache::CacheHashtable::find(), foedus::memory::PagePool::get_base(), foedus::storage::Page::get_header(), foedus::Engine::get_options(), foedus::cache::CacheHashtable::install(), foedus::kErrorCodeOk, foedus::storage::kPageSize, on_snapshot_cache_miss(), foedus::storage::PageHeader::page_id_, read_a_snapshot_page(), foedus::cache::CacheOptions::snapshot_cache_enabled_, snapshot_cache_hashtable_, snapshot_page_pool_, foedus::thread::ThreadControlBlock::stat_snapshot_cache_hits_, and foedus::thread::ThreadControlBlock::stat_snapshot_cache_misses_.

Referenced by foedus::thread::Thread::find_or_read_a_snapshot_page(), follow_page_pointer(), and install_a_volatile_page().

649  {
653  // the "find" is very efficient and wait-free, but instead it might have false positive/nagative
654  // in which case we should just install a new page. No worry about duplicate thanks to the
655  // immutability of snapshot pages. it just wastes a bit of CPU and memory.
656  if (offset == 0 || snapshot_page_pool_->get_base()[offset].get_header().page_id_ != page_id) {
657  if (offset != 0) {
658  DVLOG(0) << "Interesting, this race is rare, but possible. offset=" << offset;
659  }
660  CHECK_ERROR_CODE(on_snapshot_cache_miss(page_id, &offset));
661  ASSERT_ND(offset != 0);
664  } else {
666  }
667  ASSERT_ND(offset != 0);
668  *out = snapshot_page_pool_->get_base() + offset;
669  } else {
671  // Snapshot is disabled. So far this happens only in performance experiments.
672  // We use local work memory in this case.
675  reinterpret_cast<void**>(out),
677  return read_a_snapshot_page(page_id, *out);
678  }
679  return kErrorCodeOk;
680 }
ErrorCode install(storage::SnapshotPagePointer page_id, ContentId content)
Called when a cached page is not found.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
bool snapshot_cache_enabled_
Whether to cache the read accesses on snapshot files.
const EngineOptions & get_options() const
Definition: engine.cpp:39
storage::Page * get_base() const
Definition: page_pool.cpp:119
0 means no-error.
Definition: error_code.hpp:87
Engine *const engine_
MCS locks methods.
xct::Xct current_xct_
Current transaction this thread is conveying.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
ErrorCode acquire_local_work_memory(uint32_t size, void **out, uint32_t alignment=8)
Get a tentative work memory of the specified size from pre-allocated thread-private memory...
Definition: xct.hpp:397
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorCode on_snapshot_cache_miss(storage::SnapshotPagePointer page_id, memory::PagePoolOffset *pool_offset)
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
cache::CacheHashtable * snapshot_cache_hashtable_
same above
cache::CacheOptions cache_
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
ContentId find(storage::SnapshotPagePointer page_id) const __attribute__((always_inline))
Returns an offset for the given page ID opportunistically.
ThreadControlBlock * control_block_

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::find_or_read_snapshot_pages_batch ( uint16_t  batch_size,
const storage::SnapshotPagePointer page_ids,
storage::Page **  out 
)

Batched version of find_or_read_a_snapshot_page().

Parameters
[in]batch_sizeBatch size. Must be kMaxFindPagesBatch or less.
[in]page_idsArray of Page IDs to look for, size=batch_size
[out]outOutput

This might perform much faster because of parallel prefetching, SIMD-ized hash calculattion (planned, not implemented yet) etc.

Definition at line 687 of file thread_pimpl.cpp.

References foedus::xct::Xct::acquire_local_work_memory(), ASSERT_ND, foedus::EngineOptions::cache_, CHECK_ERROR_CODE, control_block_, current_xct_, engine_, foedus::cache::CacheHashtable::find_batch(), foedus::memory::PagePool::get_base(), foedus::storage::Page::get_header(), foedus::Engine::get_options(), foedus::cache::CacheHashtable::install(), foedus::kErrorCodeInvalidParameter, foedus::kErrorCodeOk, foedus::thread::Thread::kMaxFindPagesBatch, foedus::storage::kPageSize, on_snapshot_cache_miss(), foedus::storage::PageHeader::page_id_, read_a_snapshot_page(), foedus::cache::CacheOptions::snapshot_cache_enabled_, snapshot_cache_hashtable_, snapshot_page_pool_, foedus::thread::ThreadControlBlock::stat_snapshot_cache_hits_, foedus::thread::ThreadControlBlock::stat_snapshot_cache_misses_, and UNLIKELY.

Referenced by foedus::thread::Thread::find_or_read_snapshot_pages_batch(), and follow_page_pointers_for_read_batch().

690  {
691  ASSERT_ND(batch_size <= Thread::kMaxFindPagesBatch);
692  if (batch_size == 0) {
693  return kErrorCodeOk;
694  } else if (UNLIKELY(batch_size > Thread::kMaxFindPagesBatch)) {
696  }
697 
701  CHECK_ERROR_CODE(snapshot_cache_hashtable_->find_batch(batch_size, page_ids, offsets));
702  for (uint16_t b = 0; b < batch_size; ++b) {
703  memory::PagePoolOffset offset = offsets[b];
704  storage::SnapshotPagePointer page_id = page_ids[b];
705  if (page_id == 0) {
706  out[b] = nullptr;
707  continue;
708  } else if (b > 0 && page_ids[b - 1] == page_id) {
709  ASSERT_ND(offsets[b - 1] == offset);
710  out[b] = out[b - 1];
711  continue;
712  }
713  if (offset == 0 || snapshot_page_pool_->get_base()[offset].get_header().page_id_ != page_id) {
714  if (offset != 0) {
715  DVLOG(0) << "Interesting, this race is rare, but possible. offset=" << offset;
716  }
717  CHECK_ERROR_CODE(on_snapshot_cache_miss(page_id, &offset));
718  ASSERT_ND(offset != 0);
721  } else {
723  }
724  ASSERT_ND(offset != 0);
725  out[b] = snapshot_page_pool_->get_base() + offset;
726  }
727  } else {
729  for (uint16_t b = 0; b < batch_size; ++b) {
732  reinterpret_cast<void**>(out + b),
734  CHECK_ERROR_CODE(read_a_snapshot_page(page_ids[b], out[b]));
735  }
736  }
737  return kErrorCodeOk;
738 }
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
ErrorCode install(storage::SnapshotPagePointer page_id, ContentId content)
Called when a cached page is not found.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
Max size for find_or_read_snapshot_pages_batch() etc.
Definition: thread.hpp:55
bool snapshot_cache_enabled_
Whether to cache the read accesses on snapshot files.
const EngineOptions & get_options() const
Definition: engine.cpp:39
storage::Page * get_base() const
Definition: page_pool.cpp:119
0 means no-error.
Definition: error_code.hpp:87
ErrorCode find_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, ContentId *out) const
Batched version of find().
Engine *const engine_
MCS locks methods.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
xct::Xct current_xct_
Current transaction this thread is conveying.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
ErrorCode acquire_local_work_memory(uint32_t size, void **out, uint32_t alignment=8)
Get a tentative work memory of the specified size from pre-allocated thread-private memory...
Definition: xct.hpp:397
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorCode on_snapshot_cache_miss(storage::SnapshotPagePointer page_id, memory::PagePoolOffset *pool_offset)
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
cache::CacheHashtable * snapshot_cache_hashtable_
same above
cache::CacheOptions cache_
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
ThreadControlBlock * control_block_

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::flush_retired_volatile_page ( uint16_t  node,
Epoch  current_epoch,
memory::PagePoolOffsetAndEpochChunk chunk 
)

Subroutine of collect_retired_volatile_page() in case the chunk becomes full.

Returns the chunk to volatile pool upto safe epoch. If there aren't enough pages to safely return, advance the epoch (which should be very rare, tho).

Definition at line 789 of file thread_pimpl.cpp.

References foedus::xct::XctManager::advance_current_global_epoch(), ASSERT_ND, engine_, foedus::memory::PagePoolOffsetAndEpochChunk::full(), foedus::xct::XctManager::get_current_global_epoch(), foedus::Engine::get_memory_manager(), foedus::memory::EngineMemory::get_node_memory(), foedus::memory::PagePoolOffsetAndEpochChunk::get_safe_offset_count(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), foedus::Engine::get_xct_manager(), id_, foedus::memory::PagePool::release(), and foedus::memory::PagePoolOffsetAndEpochChunk::size().

Referenced by collect_retired_volatile_page().

792  {
793  if (chunk->size() == 0) {
794  return;
795  }
796  uint32_t safe_count = chunk->get_safe_offset_count(current_epoch);
797  while (safe_count < chunk->size() / 10U) {
798  LOG(WARNING) << "Thread-" << id_ << " can return only "
799  << safe_count << " out of " << chunk->size()
800  << " retired pages to node-" << node << " in epoch=" << current_epoch
801  << ". This means the thread received so many retired pages in a short time period."
802  << " Will adavance an epoch to safely return the retired pages."
803  << " This should be a rare event.";
805  current_epoch = engine_->get_xct_manager()->get_current_global_epoch();
806  LOG(INFO) << "okay, advanced epoch. now we should be able to return more pages";
807  safe_count = chunk->get_safe_offset_count(current_epoch);
808  }
809 
810  VLOG(0) << "Thread-" << id_ << " batch-returning retired volatile pages to node-" << node
811  << " safe_count/count=" << safe_count << "/" << chunk->size() << ". epoch=" << current_epoch;
812  memory::PagePool* volatile_pool
814  volatile_pool->release(safe_count, chunk);
815  ASSERT_ND(!chunk->full());
816 }
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
Engine *const engine_
MCS locks methods.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
Definition: page_pool.cpp:134
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
const ThreadId id_
Unique ID of this thread.
void advance_current_global_epoch()
Requests to advance the current global epoch as soon as possible and blocks until it actually does...
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::follow_page_pointer ( storage::VolatilePageInit  page_initializer,
bool  tolerate_null_pointer,
bool  will_modify,
bool  take_ptr_set_snapshot,
storage::DualPagePointer pointer,
storage::Page **  page,
const storage::Page parent,
uint16_t  index_in_parent 
)

A general method to follow (read) a page pointer.

Parameters
[in]page_initializercallback function in case we need to initialize a new volatile page. null if it never happens (eg tolerate_null_pointer is false).
[in]tolerate_null_pointerwhen true and when both the volatile and snapshot pointers seem null, we return null page rather than creating a new volatile page.
[in]will_modifyif true, we always return a non-null volatile page. This is true when we are to modify the page, such as insert/delete.
[in]take_ptr_set_snapshotif true, we add the address of volatile page pointer to ptr set when we do not follow a volatile pointer (null or volatile). This is usually true to make sure we get aware of new page installment by concurrent threads. If the isolation level is not serializable, we don't take ptr set anyways.
[in,out]pointerthe page pointer.
[out]pagethe read page.
[in]parentthe parent page that contains a pointer to the page.
[in]index_in_parentSome index (meaning depends on page type) of pointer in parent page to the page.
Precondition
!tolerate_null_pointer || !will_modify (if we are modifying the page, tolerating null pointer doesn't make sense. we should always initialize a new volatile page)

This is the primary way to retrieve a page pointed by a pointer in various places. Depending on the current transaction's isolation level and storage type (represented by the various arguments), this does a whole lots of things to comply with our commit protocol.

Remember that DualPagePointer maintains volatile and snapshot pointers. We sometimes have to install a new volatile page or add the pointer to ptr set for serializability. That logic is a bit too lengthy method to duplicate in each page type, so generalize it here.

Definition at line 411 of file thread_pimpl.cpp.

References foedus::xct::Xct::add_to_pointer_set(), ASSERT_ND, foedus::storage::assert_valid_volatile_page(), CHECK_ERROR_CODE, core_memory_, current_xct_, foedus::memory::LocalPageResolver::end_, find_or_read_a_snapshot_page(), foedus::storage::Page::get_header(), foedus::xct::Xct::get_isolation_level(), global_volatile_page_resolver_, foedus::memory::NumaCoreMemory::grab_free_volatile_page(), holder_, install_a_volatile_page(), foedus::storage::VolatilePagePointer::is_null(), foedus::kErrorCodeMemoryNoFreePages, foedus::kErrorCodeOk, foedus::xct::kSerializable, local_volatile_page_resolver_, numa_node_, place_a_new_volatile_page(), foedus::memory::GlobalVolatilePageResolver::resolve_offset(), foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::storage::VolatilePagePointer::set(), foedus::storage::PageHeader::snapshot_, foedus::storage::DualPagePointer::snapshot_pointer_, UNLIKELY, and foedus::storage::DualPagePointer::volatile_pointer_.

Referenced by foedus::thread::Thread::follow_page_pointer().

419  {
420  ASSERT_ND(!tolerate_null_pointer || !will_modify);
421 
422  storage::VolatilePagePointer volatile_pointer = pointer->volatile_pointer_;
423  bool followed_snapshot = false;
424  if (pointer->snapshot_pointer_ == 0) {
425  if (volatile_pointer.is_null()) {
426  // both null, so the page is not created yet.
427  if (tolerate_null_pointer) {
428  *page = nullptr;
429  } else {
430  // place an empty new page
431  ASSERT_ND(page_initializer);
432  // we must not install a new volatile page in snapshot page. We must not hit this case.
433  ASSERT_ND(!parent->get_header().snapshot_);
435  if (UNLIKELY(offset == 0)) {
437  }
439  storage::Page* new_page = local_volatile_page_resolver_.resolve_offset_newpage(offset);
440  storage::VolatilePagePointer new_page_id;
441  new_page_id.set(numa_node_, offset);
442  storage::VolatilePageInitArguments args = {
443  holder_,
444  new_page_id,
445  new_page,
446  parent,
447  index_in_parent
448  };
449  page_initializer(args);
450  storage::assert_valid_volatile_page(new_page, offset);
451  ASSERT_ND(new_page->get_header().snapshot_ == false);
452 
453  *page = place_a_new_volatile_page(offset, pointer);
454  }
455  } else {
456  // then we have to follow volatile page anyway
457  *page = global_volatile_page_resolver_.resolve_offset(volatile_pointer);
458  }
459  } else {
460  // if there is a snapshot page, we have a few more choices.
461  if (!volatile_pointer.is_null()) {
462  // we have a volatile page, which is guaranteed to be latest
463  *page = global_volatile_page_resolver_.resolve_offset(volatile_pointer);
464  } else if (will_modify) {
465  // we need a volatile page. so construct it from snapshot
467  } else {
468  // otherwise just use snapshot
469  CHECK_ERROR_CODE(find_or_read_a_snapshot_page(pointer->snapshot_pointer_, page));
470  followed_snapshot = true;
471  }
472  }
473  ASSERT_ND((*page) == nullptr || (followed_snapshot == (*page)->get_header().snapshot_));
474 
475  // if we follow a snapshot pointer, remember pointer set
477  if ((*page == nullptr || followed_snapshot) && take_ptr_set_snapshot) {
478  current_xct_.add_to_pointer_set(&pointer->volatile_pointer_, volatile_pointer);
479  }
480  }
481  return kErrorCodeOk;
482 }
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
PagePoolOffset end_
where a valid page entry ends.
void assert_valid_volatile_page(const Page *page, uint32_t offset)
Definition: page.hpp:407
0 means no-error.
Definition: error_code.hpp:87
const ThreadGroupId numa_node_
Node this thread belongs to.
Thread *const holder_
The public object that holds this pimpl object.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
xct::Xct current_xct_
Current transaction this thread is conveying.
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
Definition: xct.hpp:149
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
Definition: xct.cpp:198
Protects against all anomalies in all situations.
Definition: xct_id.hpp:86

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::follow_page_pointers_for_read_batch ( uint16_t  batch_size,
storage::VolatilePageInit  page_initializer,
bool  tolerate_null_pointer,
bool  take_ptr_set_snapshot,
storage::DualPagePointer **  pointers,
storage::Page **  parents,
const uint16_t *  index_in_parents,
bool *  followed_snapshots,
storage::Page **  out 
)

Batched version of follow_page_pointer with will_modify==false.

Parameters
[in]batch_sizeBatch size. Must be kMaxFindPagesBatch or less.
[in]page_initializercallback function in case we need to initialize a new volatile page. null if it never happens (eg tolerate_null_pointer is false).
[in]tolerate_null_pointerwhen true and when both the volatile and snapshot pointers seem null, we return null page rather than creating a new volatile page.
[in]take_ptr_set_snapshotif true, we add the address of volatile page pointer to ptr set when we do not follow a volatile pointer (null or volatile). This is usually true to make sure we get aware of new page installment by concurrent threads. If the isolation level is not serializable, we don't take ptr set anyways.
[in,out]pointersthe page pointers.
[in]parentsthe parent page that contains a pointer to the page.
[in]index_in_parentsSome index (meaning depends on page type) of pointer in parent page to the page.
[in,out]followed_snapshotsAs input, must be same as parents[i]==followed_snapshots[i]. As output, same as out[i]->header().snapshot_. We receive/emit this to avoid accessing page header.
[out]outthe read page.
Note
this method is guaranteed to work even if parents==out

Definition at line 484 of file thread_pimpl.cpp.

References foedus::xct::Xct::add_to_pointer_set(), ASSERT_ND, foedus::storage::assert_valid_volatile_page(), CHECK_ERROR_CODE, core_memory_, current_xct_, find_or_read_snapshot_pages_batch(), foedus::storage::Page::get_header(), foedus::xct::Xct::get_isolation_level(), global_volatile_page_resolver_, foedus::memory::NumaCoreMemory::grab_free_volatile_page(), holder_, foedus::storage::VolatilePagePointer::is_null(), foedus::kErrorCodeInvalidParameter, foedus::kErrorCodeMemoryNoFreePages, foedus::kErrorCodeOk, foedus::thread::Thread::kMaxFindPagesBatch, foedus::xct::kSerializable, local_volatile_page_resolver_, numa_node_, place_a_new_volatile_page(), foedus::memory::GlobalVolatilePageResolver::resolve_offset(), foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::storage::VolatilePagePointer::set(), foedus::storage::PageHeader::snapshot_, foedus::storage::DualPagePointer::snapshot_pointer_, UNLIKELY, and foedus::storage::DualPagePointer::volatile_pointer_.

Referenced by foedus::thread::Thread::follow_page_pointers_for_read_batch().

493  {
494  ASSERT_ND(tolerate_null_pointer || page_initializer);
495  if (batch_size == 0) {
496  return kErrorCodeOk;
497  } else if (UNLIKELY(batch_size > Thread::kMaxFindPagesBatch)) {
499  }
500 
501  // this one uses a batched find method for following snapshot pages.
502  // some of them might follow volatile pages, so we do it only when at least one snapshot ptr.
503  bool has_some_snapshot = false;
504  const bool needs_ptr_set
505  = take_ptr_set_snapshot && current_xct_.get_isolation_level() == xct::kSerializable;
506 
507  // REMINDER: Remember that it might be parents == out. We thus use tmp_out.
508  storage::Page* tmp_out[Thread::kMaxFindPagesBatch];
509 #ifndef NDEBUG
510  // fill with garbage for easier debugging
511  std::memset(tmp_out, 0xDA, sizeof(tmp_out));
512 #endif // NDEBUG
513 
514  // collect snapshot page IDs.
516  for (uint16_t b = 0; b < batch_size; ++b) {
517  snapshot_page_ids[b] = 0;
518  storage::DualPagePointer* pointer = pointers[b];
519  if (pointer == nullptr) {
520  continue;
521  }
522  // followed_snapshots is both input and output.
523  // as input, it should indicate whether the parent is snapshot or not
524  ASSERT_ND(parents[b]->get_header().snapshot_ == followed_snapshots[b]);
525  if (pointer->snapshot_pointer_ != 0 && pointer->volatile_pointer_.is_null()) {
526  has_some_snapshot = true;
527  snapshot_page_ids[b] = pointer->snapshot_pointer_;
528  }
529  }
530 
531  // follow them in a batch. output to tmp_out.
532  if (has_some_snapshot) {
533  CHECK_ERROR_CODE(find_or_read_snapshot_pages_batch(batch_size, snapshot_page_ids, tmp_out));
534  }
535 
536  // handle cases we have to volatile pages. also we might have to create a new page.
537  for (uint16_t b = 0; b < batch_size; ++b) {
538  storage::DualPagePointer* pointer = pointers[b];
539  if (has_some_snapshot) {
540  if (pointer == nullptr) {
541  out[b] = nullptr;
542  continue;
543  } else if (tmp_out[b]) {
544  // if we follow a snapshot pointer _from volatile page_, remember pointer set
545  if (needs_ptr_set && !followed_snapshots[b]) {
546  current_xct_.add_to_pointer_set(&pointer->volatile_pointer_, pointer->volatile_pointer_);
547  }
548  followed_snapshots[b] = true;
549  out[b] = tmp_out[b];
550  continue;
551  }
552  ASSERT_ND(tmp_out[b] == nullptr);
553  }
554 
555  // we didn't follow snapshot page. we must follow volatile page, or null.
556  followed_snapshots[b] = false;
557  ASSERT_ND(!parents[b]->get_header().snapshot_);
558  if (pointer->snapshot_pointer_ == 0) {
559  if (pointer->volatile_pointer_.is_null()) {
560  // both null, so the page is not created yet.
561  if (tolerate_null_pointer) {
562  out[b] = nullptr;
563  } else {
565  if (UNLIKELY(offset == 0)) {
567  }
568  storage::Page* new_page = local_volatile_page_resolver_.resolve_offset_newpage(offset);
569  storage::VolatilePagePointer new_page_id;
570  new_page_id.set(numa_node_, offset);
571  storage::VolatilePageInitArguments args = {
572  holder_,
573  new_page_id,
574  new_page,
575  parents[b],
576  index_in_parents[b]
577  };
578  page_initializer(args);
579  storage::assert_valid_volatile_page(new_page, offset);
580  ASSERT_ND(new_page->get_header().snapshot_ == false);
581 
582  out[b] = place_a_new_volatile_page(offset, pointer);
583  }
584  } else {
585  out[b] = global_volatile_page_resolver_.resolve_offset(pointer->volatile_pointer_);
586  }
587  } else {
588  ASSERT_ND(!pointer->volatile_pointer_.is_null());
589  out[b] = global_volatile_page_resolver_.resolve_offset(pointer->volatile_pointer_);
590  }
591  }
592  return kErrorCodeOk;
593 }
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
Max size for find_or_read_snapshot_pages_batch() etc.
Definition: thread.hpp:55
void assert_valid_volatile_page(const Page *page, uint32_t offset)
Definition: page.hpp:407
0 means no-error.
Definition: error_code.hpp:87
ErrorCode find_or_read_snapshot_pages_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
Batched version of find_or_read_a_snapshot_page().
const ThreadGroupId numa_node_
Node this thread belongs to.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
Thread *const holder_
The public object that holds this pimpl object.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
xct::Xct current_xct_
Current transaction this thread is conveying.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
Definition: xct.hpp:149
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
Definition: xct.cpp:198
Protects against all anomalies in all situations.
Definition: xct_id.hpp:86

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::follow_page_pointers_for_write_batch ( uint16_t  batch_size,
storage::VolatilePageInit  page_initializer,
storage::DualPagePointer **  pointers,
storage::Page **  parents,
const uint16_t *  index_in_parents,
storage::Page **  out 
)

Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.

Parameters
[in]batch_sizeBatch size. Must be kMaxFindPagesBatch or less.
[in]page_initializercallback function in case we need to initialize a new volatile page. null if it never happens (eg tolerate_null_pointer is false).
[in,out]pointersthe page pointers.
[in]parentsthe parent page that contains a pointer to the page.
[in]index_in_parentsSome index (meaning depends on page type) of pointer in parent page to the page.
[out]outthe read page.
Note
this method is guaranteed to work even if parents==out

Definition at line 595 of file thread_pimpl.cpp.

References ASSERT_ND, foedus::storage::assert_valid_volatile_page(), CHECK_ERROR_CODE, core_memory_, foedus::storage::Page::get_header(), global_volatile_page_resolver_, foedus::memory::NumaCoreMemory::grab_free_volatile_page(), holder_, install_a_volatile_page(), foedus::storage::VolatilePagePointer::is_null(), foedus::kErrorCodeMemoryNoFreePages, foedus::kErrorCodeOk, local_volatile_page_resolver_, numa_node_, place_a_new_volatile_page(), foedus::memory::GlobalVolatilePageResolver::resolve_offset(), foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::storage::VolatilePagePointer::set(), foedus::storage::PageHeader::snapshot_, foedus::storage::DualPagePointer::snapshot_pointer_, UNLIKELY, and foedus::storage::DualPagePointer::volatile_pointer_.

Referenced by foedus::thread::Thread::follow_page_pointers_for_write_batch().

601  {
602  // REMINDER: Remember that it might be parents == out. It's not an issue in this function, tho.
603  // this method is not quite batched as it doesn't need to be.
604  // still, less branches because we can assume all of them need a writable volatile page.
605  for (uint16_t b = 0; b < batch_size; ++b) {
606  storage::DualPagePointer* pointer = pointers[b];
607  if (pointer == nullptr) {
608  out[b] = nullptr;
609  continue;
610  }
611  ASSERT_ND(!parents[b]->get_header().snapshot_);
612  storage::Page** page = out + b;
613  storage::VolatilePagePointer volatile_pointer = pointer->volatile_pointer_;
614  if (!volatile_pointer.is_null()) {
615  *page = global_volatile_page_resolver_.resolve_offset(volatile_pointer);
616  } else if (pointer->snapshot_pointer_ == 0) {
617  // we need a volatile page. so construct it from snapshot
619  } else {
620  ASSERT_ND(page_initializer);
621  // we must not install a new volatile page in snapshot page. We must not hit this case.
623  if (UNLIKELY(offset == 0)) {
625  }
626  storage::Page* new_page = local_volatile_page_resolver_.resolve_offset_newpage(offset);
627  storage::VolatilePagePointer new_page_id;
628  new_page_id.set(numa_node_, offset);
629  storage::VolatilePageInitArguments args = {
630  holder_,
631  new_page_id,
632  new_page,
633  parents[b],
634  index_in_parents[b]
635  };
636  page_initializer(args);
637  storage::assert_valid_volatile_page(new_page, offset);
638  ASSERT_ND(new_page->get_header().snapshot_ == false);
639 
640  *page = place_a_new_volatile_page(offset, pointer);
641  }
642  ASSERT_ND(out[b] != nullptr);
643  }
644  return kErrorCodeOk;
645 }
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
void assert_valid_volatile_page(const Page *page, uint32_t offset)
Definition: page.hpp:407
0 means no-error.
Definition: error_code.hpp:87
const ThreadGroupId numa_node_
Node this thread belongs to.
Thread *const holder_
The public object that holds this pimpl object.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::get_mcs_rw_my_blocks ( xct::McsRwSimpleBlock **  out)
inline

Definition at line 314 of file thread_pimpl.hpp.

References mcs_rw_simple_blocks_.

Referenced by foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_rw_my_block().

314 { *out = mcs_rw_simple_blocks_; }
xct::McsRwSimpleBlock * mcs_rw_simple_blocks_

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::get_mcs_rw_my_blocks ( xct::McsRwExtendedBlock **  out)
inline

Definition at line 315 of file thread_pimpl.hpp.

References mcs_rw_extended_blocks_.

315 { *out = mcs_rw_extended_blocks_; }
xct::McsRwExtendedBlock * mcs_rw_extended_blocks_
ThreadRef foedus::thread::ThreadPimpl::get_thread_ref ( ThreadId  id)

Definition at line 767 of file thread_pimpl.cpp.

References engine_, foedus::thread::ThreadPool::get_pimpl(), foedus::Engine::get_thread_pool(), and foedus::thread::ThreadPoolPimpl::get_thread_ref().

Referenced by foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_other_cur_block(), foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_rw_other_async_block(), foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_rw_other_block(), foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_ww_other_block(), and foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::other_waiting().

767  {
768  auto* pool_pimpl = engine_->get_thread_pool()->get_pimpl();
769  return pool_pimpl->get_thread_ref(id);
770 }
ThreadPoolPimpl * get_pimpl() const
Returns the pimpl of this object.
ThreadRef get_thread_ref(ThreadId id) __attribute__((always_inline))
For better performance, but for some reason this method causes an issue in MCS lock.
Engine *const engine_
MCS locks methods.
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
Definition: engine.cpp:52

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::handle_tasks ( )

Main routine of the worker thread.

This method keeps checking current_task_. Whenever it retrieves a task, it runs it and re-sets current_task_ when it's done. It exists when exit_requested_ is set.

Definition at line 163 of file thread_pimpl.cpp.

References ASSERT_ND, control_block_, current_xct_, foedus::thread::decompose_numa_node(), foedus::xct::XctOptions::enable_retrospective_lock_list_, engine_, foedus::Engine::get_options(), foedus::proc::ProcManager::get_proc(), foedus::Engine::get_proc_manager(), holder_, foedus::storage::StorageOptions::hot_threshold_, foedus::xct::XctOptions::hot_threshold_for_retrospective_lock_list_, id_, foedus::ErrorStack::is_error(), is_stop_requested(), foedus::thread::kNotInitialized, foedus::thread::kRunningTask, foedus::soc::ThreadMemoryAnchors::kTaskOutputMemorySize, foedus::thread::kTerminated, foedus::thread::kWaitingForClientRelease, foedus::thread::kWaitingForExecution, foedus::thread::kWaitingForTask, foedus::xct::Xct::set_default_hot_threshold_for_this_xct(), foedus::xct::Xct::set_default_rll_for_this_xct(), foedus::xct::Xct::set_default_rll_threshold_for_this_xct(), set_thread_schedule(), foedus::assorted::spinlock_yield(), foedus::EngineOptions::storage_, task_input_memory_, task_output_memory_, and foedus::EngineOptions::xct_.

Referenced by initialize_once().

163  {
164  int numa_node = static_cast<int>(decompose_numa_node(id_));
165  LOG(INFO) << "Thread-" << id_ << " started running on NUMA node: " << numa_node
166  << " control_block address=" << control_block_;
167  NumaThreadScope scope(numa_node);
169  ASSERT_ND(control_block_->status_ == kNotInitialized);
170  control_block_->status_ = kWaitingForTask;
171  while (!is_stop_requested()) {
173  {
174  uint64_t demand = control_block_->wakeup_cond_.acquire_ticket();
175  if (is_stop_requested()) {
176  break;
177  }
178  // these two status are "not urgent".
179  if (control_block_->status_ == kWaitingForTask
180  || control_block_->status_ == kWaitingForClientRelease) {
181  VLOG(0) << "Thread-" << id_ << " sleeping...";
182  control_block_->wakeup_cond_.timedwait(demand, 100000ULL, 1U << 16, 1U << 13);
183  }
184  }
185  VLOG(0) << "Thread-" << id_ << " woke up. status=" << control_block_->status_;
186  if (control_block_->status_ == kWaitingForExecution) {
187  control_block_->output_len_ = 0;
188  control_block_->status_ = kRunningTask;
189 
190  // Reset the default value of enable_rll_for_this_xct etc to system-wide setting
191  // for every impersonation.
198 
199  const proc::ProcName& proc_name = control_block_->proc_name_;
200  VLOG(0) << "Thread-" << id_ << " retrieved a task: " << proc_name;
201  proc::Proc proc = nullptr;
202  ErrorStack result = engine_->get_proc_manager()->get_proc(proc_name, &proc);
203  if (result.is_error()) {
204  // control_block_->proc_result_
205  LOG(ERROR) << "Thread-" << id_ << " couldn't find procedure: " << proc_name;
206  } else {
207  uint32_t output_used = 0;
208  proc::ProcArguments args = {
209  engine_,
210  holder_,
212  control_block_->input_len_,
215  &output_used,
216  };
217  result = proc(args);
218  VLOG(0) << "Thread-" << id_ << " run(task) returned. result =" << result
219  << ", output_used=" << output_used;
220  control_block_->output_len_ = output_used;
221  }
222  if (result.is_error()) {
223  control_block_->proc_result_.from_error_stack(result);
224  } else {
225  control_block_->proc_result_.clear();
226  }
227  control_block_->status_ = kWaitingForClientRelease;
228  {
229  // Wakeup the client if it's waiting.
230  control_block_->task_complete_cond_.signal();
231  }
232  VLOG(0) << "Thread-" << id_ << " finished a task. result =" << result;
233  }
234  }
236  control_block_->status_ = kTerminated;
237  LOG(INFO) << "Thread-" << id_ << " exits";
238 }
void set_default_rll_for_this_xct(bool value)
Definition: xct.hpp:126
Idle state, receiving a new task.
Definition: thread_id.hpp:207
void set_default_hot_threshold_for_this_xct(uint16_t value)
Definition: xct.hpp:132
void set_thread_schedule()
initializes the thread's policy/priority
bool enable_retrospective_lock_list_
Whether to use Retrospective Lock List (RLL) after aborts.
A client has set a next task.
Definition: thread_id.hpp:209
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint64_t hot_threshold_
Page hotness >= this value will be considered hot (hybrid CC only).
storage::StorageOptions storage_
Engine *const engine_
MCS locks methods.
Thread *const holder_
The public object that holds this pimpl object.
The thread has picked the task up and is now running.
Definition: thread_id.hpp:211
uint16_t hot_threshold_for_retrospective_lock_list_
When we construct Retrospective Lock List (RLL) after aborts, we add read-locks on records whose hotn...
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
proc::ProcManager * get_proc_manager() const
See System and User Procedures.
Definition: engine.cpp:51
void spinlock_yield()
Invoke _mm_pause(), x86 PAUSE instruction, or something equivalent in the env.
assorted::FixedString< 60 > ProcName
Represents a unique name of a procedure.
Definition: proc_id.hpp:44
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
ErrorStack get_proc(const ProcName &name, Proc *out)
Returns the function pointer of the specified procedure.
void set_default_rll_threshold_for_this_xct(uint16_t value)
Definition: xct.hpp:139
The thread has completed the task and set the result.
Definition: thread_id.hpp:213
ErrorStack(* Proc)(const ProcArguments &args)
A function pointer of a user/system stored procedure.
Definition: proc_id.hpp:113
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
The thread has terminated (either error or normal, check the result to differentiate them)...
Definition: thread_id.hpp:217
ThreadControlBlock * control_block_

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::thread::ThreadPimpl::initialize_once ( )
finaloverridevirtual

Implements foedus::DefaultInitializable.

Definition at line 81 of file thread_pimpl.cpp.

References ASSERT_ND, foedus::EngineOptions::cache_, CHECK_ERROR, control_block_, core_memory_, current_xct_, engine_, foedus::memory::NumaNodeMemory::get_core_memory(), foedus::memory::EngineMemory::get_global_volatile_page_resolver(), foedus::memory::EngineMemory::get_local_memory(), foedus::Engine::get_memory_manager(), foedus::Engine::get_options(), foedus::memory::PagePool::get_resolver(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::memory::NumaNodeMemory::get_snapshot_cache_table(), foedus::memory::NumaNodeMemory::get_snapshot_pool(), foedus::Engine::get_soc_manager(), foedus::soc::SharedMemoryRepo::get_thread_memory_anchors(), foedus::memory::NumaNodeMemory::get_volatile_pool(), global_volatile_page_resolver_, handle_tasks(), id_, foedus::thread::ThreadControlBlock::initialize(), foedus::xct::Xct::initialize(), foedus::DefaultInitializable::initialize(), foedus::DefaultInitializable::is_initialized(), foedus::xct::XctOptions::kMcsImplementationTypeExtended, foedus::xct::XctOptions::kMcsImplementationTypeSimple, foedus::kRetOk, local_volatile_page_resolver_, log_buffer_, foedus::thread::ThreadControlBlock::mcs_block_current_, foedus::xct::XctOptions::mcs_implementation_type_, foedus::thread::ThreadControlBlock::mcs_rw_async_mapping_current_, mcs_rw_async_mappings_, mcs_rw_extended_blocks_, mcs_rw_simple_blocks_, mcs_ww_blocks_, node_memory_, raw_thread_, raw_thread_set_, simple_mcs_rw_, foedus::cache::CacheOptions::snapshot_cache_enabled_, snapshot_cache_hashtable_, snapshot_file_set_, snapshot_page_pool_, task_input_memory_, task_output_memory_, and foedus::EngineOptions::xct_.

81  {
83 
84  soc::ThreadMemoryAnchors* anchors
86  control_block_ = anchors->thread_memory_;
88  task_input_memory_ = anchors->task_input_memory_;
89  task_output_memory_ = anchors->task_output_memory_;
90  mcs_ww_blocks_ = anchors->mcs_ww_lock_memories_;
91  mcs_rw_simple_blocks_ = anchors->mcs_rw_simple_lock_memories_;
92  mcs_rw_extended_blocks_ = anchors->mcs_rw_extended_lock_memories_;
93  mcs_rw_async_mappings_ = anchors->mcs_rw_async_mappings_memories_;
94 
103  } else {
104  snapshot_cache_hashtable_ = nullptr;
105  }
108  core_memory_,
116 
117  raw_thread_set_ = false;
118  raw_thread_ = std::move(std::thread(&ThreadPimpl::handle_tasks, this));
119  raw_thread_set_ = true;
120  return kRetOk;
121 }
void initialize(ThreadId my_thread_id)
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
NumaCoreMemory * get_core_memory(foedus::thread::ThreadId id) const
uint32_t mcs_block_current_
How many MCS blocks we allocated in this thread's current xct.
memory::NumaNodeMemory * node_memory_
same above
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
std::thread raw_thread_
Encapsulates raw thread object.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
bool snapshot_cache_enabled_
Whether to cache the read accesses on snapshot files.
std::atomic< bool > raw_thread_set_
Just to make sure raw_thread_ is set.
const EngineOptions & get_options() const
Definition: engine.cpp:39
xct::McsWwBlock * mcs_ww_blocks_
Pre-allocated MCS blocks.
Engine *const engine_
MCS locks methods.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
uint32_t mcs_rw_async_mapping_current_
How many async mappings for extended RW lock we have so far.
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
void handle_tasks()
Main routine of the worker thread.
xct::McsRwSimpleBlock * mcs_rw_simple_blocks_
cache::CacheHashtable * get_snapshot_cache_table()
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
NumaNodeMemory * get_local_memory() const
const ErrorStack kRetOk
Normal return value for no-error case.
uint16_t mcs_implementation_type_
Defines which implementation of MCS locks to use for RW locks.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
xct::McsRwExtendedBlock * mcs_rw_extended_blocks_
xct::McsRwAsyncMapping * mcs_rw_async_mappings_
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ThreadMemoryAnchors * get_thread_memory_anchors(thread::ThreadId thread_id)
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
bool simple_mcs_rw_
shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
cache::CacheHashtable * snapshot_cache_hashtable_
same above
cache::CacheOptions cache_
void initialize(memory::NumaCoreMemory *core_memory, uint32_t *mcs_block_current, uint32_t *mcs_rw_async_mapping_current)
Definition: xct.cpp:77
ThreadControlBlock * control_block_
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

ErrorCode foedus::thread::ThreadPimpl::install_a_volatile_page ( storage::DualPagePointer pointer,
storage::Page **  installed_page 
)

Installs a volatile page to the given dual pointer as a copy of the snapshot page.

Parameters
[in,out]pointerdual pointer. volatile pointer will be modified.
[out]installed_pagephysical pointer to the installed volatile page. This might point to a page installed by a concurrent thread.
Precondition
pointer->snapshot_pointer_ != 0 (this method is for a page that already has snapshot)
pointer->volatile_pointer.components.offset == 0 (but not mandatory because concurrent threads might have installed it right now)

This is called when a dual pointer has only a snapshot pointer, in other words it is "clean", to create a volatile version for modification.

Definition at line 292 of file thread_pimpl.cpp.

References ASSERT_ND, CHECK_ERROR_CODE, core_memory_, foedus::memory::LocalPageResolver::end_, find_or_read_a_snapshot_page(), foedus::storage::Page::get_header(), foedus::storage::VolatilePagePointer::get_offset(), foedus::memory::NumaCoreMemory::grab_free_volatile_page_pointer(), foedus::storage::VolatilePagePointer::is_null(), foedus::kErrorCodeMemoryNoFreePages, foedus::kErrorCodeOk, foedus::storage::kPageSize, local_volatile_page_resolver_, foedus::storage::PageHeader::page_id_, place_a_new_volatile_page(), foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::storage::PageHeader::snapshot_, foedus::storage::DualPagePointer::snapshot_pointer_, UNLIKELY, and foedus::storage::VolatilePagePointer::word.

Referenced by follow_page_pointer(), follow_page_pointers_for_write_batch(), and foedus::thread::Thread::install_a_volatile_page().

294  {
295  ASSERT_ND(pointer->snapshot_pointer_ != 0);
296 
297  // copy from snapshot version
298  storage::Page* snapshot_page;
299  CHECK_ERROR_CODE(find_or_read_a_snapshot_page(pointer->snapshot_pointer_, &snapshot_page));
300  storage::VolatilePagePointer volatile_pointer = core_memory_->grab_free_volatile_page_pointer();
301  const auto offset = volatile_pointer.get_offset();
302  if (UNLIKELY(volatile_pointer.is_null())) {
304  }
306  storage::Page* page = local_volatile_page_resolver_.resolve_offset_newpage(offset);
307  std::memcpy(page, snapshot_page, storage::kPageSize);
308  // We copied from a snapshot page, so the snapshot flag is on.
309  ASSERT_ND(page->get_header().snapshot_);
310  page->get_header().snapshot_ = false; // now it's volatile
311  page->get_header().page_id_ = volatile_pointer.word; // and correct page ID
312 
313  *installed_page = place_a_new_volatile_page(offset, pointer);
314  return kErrorCodeOk;
315 }
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
storage::VolatilePagePointer grab_free_volatile_page_pointer()
Wrapper for grab_free_volatile_page().
PagePoolOffset end_
where a valid page entry ends.
0 means no-error.
Definition: error_code.hpp:87
memory::PagePoolOffset get_offset() const
Definition: storage_id.hpp:202
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::thread::ThreadPimpl::is_simple_mcs_rw ( ) const
inline

Definition at line 282 of file thread_pimpl.hpp.

References simple_mcs_rw_.

Referenced by cll_giveup_all_locks_after(), cll_release_all_locks(), cll_release_all_locks_after(), cll_try_or_acquire_multiple_locks(), cll_try_or_acquire_single_lock(), run_nested_sysxct(), sysxct_batch_page_locks(), sysxct_batch_record_locks(), sysxct_page_lock(), and sysxct_record_lock().

282 { return simple_mcs_rw_; }
bool simple_mcs_rw_
shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple

Here is the caller graph for this function:

bool foedus::thread::ThreadPimpl::is_stop_requested ( ) const

Definition at line 158 of file thread_pimpl.cpp.

References control_block_, foedus::thread::kWaitingForTerminate, foedus::assorted::memory_fence_acquire(), and foedus::thread::ThreadControlBlock::status_.

Referenced by handle_tasks().

158  {
161 }
The thread is requested to terminate.
Definition: thread_id.hpp:215
ThreadStatus status_
Impersonation status of this thread.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
ThreadControlBlock * control_block_

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::thread::ThreadPimpl::is_volatile_page_retired ( storage::VolatilePagePointer  ptr)

Subroutine of collect_retired_volatile_page() just for assertion.

Definition at line 818 of file thread_pimpl.cpp.

References foedus::storage::Page::get_header(), global_volatile_page_resolver_, foedus::storage::PageVersion::is_retired(), foedus::storage::PageHeader::page_version_, and foedus::memory::GlobalVolatilePageResolver::resolve_offset().

Referenced by collect_retired_volatile_page().

818  {
819  storage::Page* page = global_volatile_page_resolver_.resolve_offset(ptr);
820  return page->get_header().page_version_.is_retired();
821 }
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
bool is_retired() const __attribute__((always_inline))
Definition: page.hpp:140
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
PageVersion page_version_
Used in several storage types as concurrency control mechanism for the page.
Definition: page.hpp:272
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::on_snapshot_cache_miss ( storage::SnapshotPagePointer  page_id,
memory::PagePoolOffset pool_offset 
)

Definition at line 741 of file thread_pimpl.cpp.

References core_memory_, foedus::memory::PagePool::get_base(), foedus::memory::NumaCoreMemory::grab_free_snapshot_page(), holder_, foedus::kErrorCodeCacheNoFreePages, foedus::kErrorCodeOk, read_a_snapshot_page(), foedus::memory::NumaCoreMemory::release_free_snapshot_page(), and snapshot_page_pool_.

Referenced by find_or_read_a_snapshot_page(), and find_or_read_snapshot_pages_batch().

743  {
744  // grab a buffer page to read into.
746  if (offset == 0) {
747  // TASK(Hideaki) First, we have to make sure this doesn't happen often (cleaner's work).
748  // Second, when this happens, we have to do eviction now, but probably after aborting the xct.
749  LOG(ERROR) << "Could not grab free snapshot page while cache miss. thread=" << *holder_
750  << ", page_id=" << assorted::Hex(page_id);
752  }
753 
754  storage::Page* new_page = snapshot_page_pool_->get_base() + offset;
755  ErrorCode read_result = read_a_snapshot_page(page_id, new_page);
756  if (read_result != kErrorCodeOk) {
757  LOG(ERROR) << "Failed to read a snapshot page. thread=" << *holder_
758  << ", page_id=" << assorted::Hex(page_id);
760  return read_result;
761  }
762 
763  *pool_offset = offset;
764  return kErrorCodeOk;
765 }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
storage::Page * get_base() const
Definition: page_pool.cpp:119
0 means no-error.
Definition: error_code.hpp:87
0x0901 : "SPCACHE: Not enough free snapshot pages. Cleaner is not catching up" .
Definition: error_code.hpp:192
Thread *const holder_
The public object that holds this pimpl object.
void release_free_snapshot_page(PagePoolOffset offset)
Same, except it's for snapshot page.
PagePoolOffset grab_free_snapshot_page()
Same, except it's for snapshot page.
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85

Here is the call graph for this function:

Here is the caller graph for this function:

storage::Page * foedus::thread::ThreadPimpl::place_a_new_volatile_page ( memory::PagePoolOffset  new_offset,
storage::DualPagePointer pointer 
)

Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new volatile page created by this thread.

Parameters
[in]new_offsetoffset of the new volatile page created by this thread
[in,out]pointerthe address to place a new pointer.
Returns
placed_page point to the volatile page that is actually placed.

Due to concurrent threads, this method might discard the given volatile page and pick a page placed by another thread. In that case, new_offset will be released to the free pool.

Definition at line 317 of file thread_pimpl.cpp.

References ASSERT_ND, core_memory_, foedus::storage::Page::get_header(), global_volatile_page_resolver_, id_, foedus::storage::VolatilePagePointer::is_null(), local_volatile_page_resolver_, numa_node_, foedus::memory::NumaCoreMemory::release_free_volatile_page(), foedus::memory::GlobalVolatilePageResolver::resolve_offset(), foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::storage::VolatilePagePointer::set(), foedus::storage::PageHeader::snapshot_, foedus::storage::DualPagePointer::volatile_pointer_, and foedus::storage::VolatilePagePointer::word.

Referenced by follow_page_pointer(), follow_page_pointers_for_read_batch(), follow_page_pointers_for_write_batch(), and install_a_volatile_page().

319  {
320  while (true) {
321  storage::VolatilePagePointer cur_pointer = pointer->volatile_pointer_;
322  storage::VolatilePagePointer new_pointer;
323  new_pointer.set(numa_node_, new_offset);
324  // atomically install it.
325  if (cur_pointer.is_null() &&
326  assorted::raw_atomic_compare_exchange_strong<uint64_t>(
327  &(pointer->volatile_pointer_.word),
328  &(cur_pointer.word),
329  new_pointer.word)) {
330  // successfully installed
332  break;
333  } else {
334  if (!cur_pointer.is_null()) {
335  // someone else has installed it!
336  VLOG(0) << "Interesting. Lost race to install a volatile page. ver-b. Thread-" << id_
337  << ", local offset=" << new_offset << " winning=" << cur_pointer;
339  storage::Page* placed_page = global_volatile_page_resolver_.resolve_offset(cur_pointer);
340  ASSERT_ND(placed_page->get_header().snapshot_ == false);
341  return placed_page;
342  } else {
343  // This is probably a bug, but we might only change mod count for some reason.
344  LOG(WARNING) << "Very interesting. Lost race but volatile page not installed. Thread-"
345  << id_ << ", local offset=" << new_offset;
346  continue;
347  }
348  }
349  }
350 }
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
void release_free_volatile_page(PagePoolOffset offset)
Returns one free volatile page to local page pool.
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
const ThreadGroupId numa_node_
Node this thread belongs to.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
const ThreadId id_
Unique ID of this thread.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::read_a_snapshot_page ( storage::SnapshotPagePointer  page_id,
storage::Page buffer 
)
inline

Read a snapshot page using the thread-local file descriptor set.

Attention
this method always READs, so no caching done. Actually, this method is used from caching module when cache miss happens. To utilize cache, use find_or_read_a_snapshot_page().

Definition at line 512 of file thread_pimpl.hpp.

References foedus::cache::SnapshotFileSet::read_page(), and snapshot_file_set_.

Referenced by find_or_read_a_snapshot_page(), find_or_read_snapshot_pages_batch(), on_snapshot_cache_miss(), and foedus::thread::Thread::read_a_snapshot_page().

514  {
515  return snapshot_file_set_.read_page(page_id, buffer);
516 }
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::read_snapshot_pages ( storage::SnapshotPagePointer  page_id_begin,
uint32_t  page_count,
storage::Page buffer 
)
inline

Read contiguous pages in one shot.

Other than that same as read_a_snapshot_page().

Definition at line 517 of file thread_pimpl.hpp.

References foedus::cache::SnapshotFileSet::read_pages(), and snapshot_file_set_.

Referenced by foedus::thread::Thread::read_snapshot_pages().

520  {
521  return snapshot_file_set_.read_pages(page_id_begin, page_count, buffer);
522 }
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
ErrorCode read_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, void *out)
Read contiguous pages in one shot.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::run_nested_sysxct ( xct::SysxctFunctor functor,
uint32_t  max_retries 
)

Sysxct-related.

Impl. mostly just forwarding to SysxctLockList.

Definition at line 980 of file thread_pimpl.cpp.

References cll_get_max_locked_id(), current_xct_, foedus::xct::Xct::get_sysxct_workspace(), is_simple_mcs_rw(), and foedus::xct::run_nested_sysxct_impl().

Referenced by foedus::thread::Thread::run_nested_sysxct().

982  {
983  xct::SysxctWorkspace* workspace = current_xct_.get_sysxct_workspace();
984  xct::UniversalLockId enclosing_max_lock_id = cll_get_max_locked_id();
985  ThreadPimplCllReleaseAllFunctor release_functor(this);
986  if (is_simple_mcs_rw()) {
987  ThreadPimplMcsAdaptor< xct::McsRwSimpleBlock > adaptor(this);
989  functor,
990  adaptor,
991  max_retries,
992  workspace,
993  enclosing_max_lock_id,
994  release_functor);
995  } else {
996  ThreadPimplMcsAdaptor< xct::McsRwExtendedBlock > adaptor(this);
998  functor,
999  adaptor,
1000  max_retries,
1001  workspace,
1002  enclosing_max_lock_id,
1003  release_functor);
1004  }
1005 }
ErrorCode run_nested_sysxct_impl(SysxctFunctor *functor, MCS_ADAPTOR mcs_adaptor, uint32_t max_retries, SysxctWorkspace *workspace, UniversalLockId enclosing_max_lock_id, ENCLOSURE_RELEASE_ALL_LOCKS_FUNCTOR enclosure_release_all_locks_functor)
Runs a system transaction nested in a user transaction.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
Definition: xct_id.hpp:134
SysxctWorkspace * get_sysxct_workspace() const
Definition: xct.hpp:142
xct::Xct current_xct_
Current transaction this thread is conveying.
xct::UniversalLockId cll_get_max_locked_id() const

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::thread::ThreadPimpl::set_thread_schedule ( )

initializes the thread's policy/priority

Definition at line 239 of file thread_pimpl.cpp.

References engine_, foedus::Engine::get_options(), id_, foedus::assorted::os_error(), foedus::thread::ThreadOptions::overwrite_thread_schedule_, raw_thread_, raw_thread_set_, SPINLOCK_WHILE, foedus::EngineOptions::thread_, foedus::thread::ThreadOptions::thread_policy_, and foedus::thread::ThreadOptions::thread_priority_.

Referenced by handle_tasks().

239  {
240  // this code totally assumes pthread. maybe ifdef to handle Windows.. later!
241  SPINLOCK_WHILE(raw_thread_set_ == false) {
242  // as the copy to raw_thread_ might happen after the launched thread getting here,
243  // we check it and spin. This is mainly to make valgrind happy.
244  continue;
245  }
246  pthread_t handle = static_cast<pthread_t>(raw_thread_.native_handle());
247  int policy;
248  sched_param param;
249  int ret = ::pthread_getschedparam(handle, &policy, &param);
250  if (ret) {
251  LOG(FATAL) << "WTF. pthread_getschedparam() failed: error=" << assorted::os_error();
252  }
253  const ThreadOptions& opt = engine_->get_options().thread_;
254  // output the following logs just once.
255  if (id_ == 0) {
256  LOG(INFO) << "The default thread policy=" << policy << ", priority=" << param.__sched_priority;
257  if (opt.overwrite_thread_schedule_) {
258  LOG(INFO) << "Overwriting thread policy=" << opt.thread_policy_
259  << ", priority=" << opt.thread_priority_;
260  }
261  }
262  if (opt.overwrite_thread_schedule_) {
263  policy = opt.thread_policy_;
264  param.__sched_priority = opt.thread_priority_;
265  int priority_max = ::sched_get_priority_max(policy);
266  int priority_min = ::sched_get_priority_min(policy);
267  if (opt.thread_priority_ > priority_max) {
268  LOG(WARNING) << "Thread priority too large. using max value: "
269  << opt.thread_priority_ << "->" << priority_max;
270  param.__sched_priority = priority_max;
271  }
272  if (opt.thread_priority_ < priority_min) {
273  LOG(WARNING) << "Thread priority too small. using min value: "
274  << opt.thread_priority_ << "->" << priority_min;
275  param.__sched_priority = priority_min;
276  }
277  int ret2 = ::pthread_setschedparam(handle, policy, &param);
278  if (ret2 == EPERM) {
279  // this is a quite common mis-configuratrion, so let's output a friendly error message.
280  // also, not fatal. keep running, as this only affects performance.
281  LOG(WARNING) << "========= ATTENTION: Thread-scheduling Permission Error!!!! ==========\n"
282  " pthread_setschedparam() failed due to permission error. This means you have\n"
283  " not set appropriate rtprio to limits.conf. You cannot set priority higher than what\n"
284  " OS allows. Configure limits.conf (eg. 'kimurhid - rtprio 99') or modify ThreadOptions.\n"
285  "============================= ATTENTION ======================";
286  } else if (ret2) {
287  LOG(FATAL) << "WTF pthread_setschedparam() failed: error=" << assorted::os_error();
288  }
289  }
290 }
std::thread raw_thread_
Encapsulates raw thread object.
std::atomic< bool > raw_thread_set_
Just to make sure raw_thread_ is set.
const EngineOptions & get_options() const
Definition: engine.cpp:39
Engine *const engine_
MCS locks methods.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
const ThreadId id_
Unique ID of this thread.
std::string os_error()
Thread-safe strerror(errno).
thread::ThreadOptions thread_
ThreadPolicy thread_policy_
Thread policy for worker threads.

Here is the call graph for this function:

Here is the caller graph for this function:

template<typename FUNC >
void foedus::thread::ThreadPimpl::switch_mcs_impl ( FUNC  func)

MCS locks methods.

These just delegate to xct_mcs_impl. Comments ommit as they are the same as xct_mcs_impl's.

ErrorCode foedus::thread::ThreadPimpl::sysxct_batch_page_locks ( xct::SysxctWorkspace sysxct_workspace,
uint32_t  lock_count,
storage::Page **  pages 
)

Definition at line 1049 of file thread_pimpl.cpp.

References ASSERT_ND, is_simple_mcs_rw(), foedus::xct::SysxctWorkspace::lock_list_, and foedus::xct::SysxctWorkspace::running_sysxct_.

Referenced by foedus::thread::Thread::sysxct_batch_page_locks().

1052  {
1053  ASSERT_ND(sysxct_workspace->running_sysxct_);
1054  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1055  if (is_simple_mcs_rw()) {
1056  ThreadPimplMcsAdaptor< xct::McsRwSimpleBlock > adaptor(this);
1057  return sysxct_lock_list.batch_request_page_locks(adaptor, lock_count, pages);
1058  } else {
1059  ThreadPimplMcsAdaptor< xct::McsRwExtendedBlock > adaptor(this);
1060  return sysxct_lock_list.batch_request_page_locks(adaptor, lock_count, pages);
1061  }
1062 }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::sysxct_batch_record_locks ( xct::SysxctWorkspace sysxct_workspace,
storage::VolatilePagePointer  page_id,
uint32_t  lock_count,
xct::RwLockableXctId **  locks 
)

Definition at line 1021 of file thread_pimpl.cpp.

References ASSERT_ND, is_simple_mcs_rw(), foedus::xct::SysxctWorkspace::lock_list_, and foedus::xct::SysxctWorkspace::running_sysxct_.

Referenced by foedus::thread::Thread::sysxct_batch_record_locks().

1025  {
1026  ASSERT_ND(sysxct_workspace->running_sysxct_);
1027  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1028  if (is_simple_mcs_rw()) {
1029  ThreadPimplMcsAdaptor< xct::McsRwSimpleBlock > adaptor(this);
1030  return sysxct_lock_list.batch_request_record_locks(adaptor, page_id, lock_count, locks);
1031  } else {
1032  ThreadPimplMcsAdaptor< xct::McsRwExtendedBlock > adaptor(this);
1033  return sysxct_lock_list.batch_request_record_locks(adaptor, page_id, lock_count, locks);
1034  }
1035 }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::sysxct_page_lock ( xct::SysxctWorkspace sysxct_workspace,
storage::Page page 
)

Definition at line 1036 of file thread_pimpl.cpp.

References ASSERT_ND, is_simple_mcs_rw(), foedus::xct::SysxctWorkspace::lock_list_, and foedus::xct::SysxctWorkspace::running_sysxct_.

Referenced by foedus::thread::Thread::sysxct_page_lock().

1038  {
1039  ASSERT_ND(sysxct_workspace->running_sysxct_);
1040  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1041  if (is_simple_mcs_rw()) {
1042  ThreadPimplMcsAdaptor< xct::McsRwSimpleBlock > adaptor(this);
1043  return sysxct_lock_list.request_page_lock(adaptor, page);
1044  } else {
1045  ThreadPimplMcsAdaptor< xct::McsRwExtendedBlock > adaptor(this);
1046  return sysxct_lock_list.request_page_lock(adaptor, page);
1047  }
1048 }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::thread::ThreadPimpl::sysxct_record_lock ( xct::SysxctWorkspace sysxct_workspace,
storage::VolatilePagePointer  page_id,
xct::RwLockableXctId lock 
)

Definition at line 1007 of file thread_pimpl.cpp.

References ASSERT_ND, is_simple_mcs_rw(), foedus::xct::SysxctWorkspace::lock_list_, and foedus::xct::SysxctWorkspace::running_sysxct_.

Referenced by foedus::thread::Thread::sysxct_record_lock().

1010  {
1011  ASSERT_ND(sysxct_workspace->running_sysxct_);
1012  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1013  if (is_simple_mcs_rw()) {
1014  ThreadPimplMcsAdaptor< xct::McsRwSimpleBlock > adaptor(this);
1015  return sysxct_lock_list.request_record_lock(adaptor, page_id, lock);
1016  } else {
1017  ThreadPimplMcsAdaptor< xct::McsRwExtendedBlock > adaptor(this);
1018  return sysxct_lock_list.request_record_lock(adaptor, page_id, lock);
1019  }
1020 }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::thread::ThreadPimpl::uninitialize_once ( )
finaloverridevirtual

Implements foedus::DefaultInitializable.

Definition at line 122 of file thread_pimpl.cpp.

References ASSERT_ND, control_block_, core_memory_, foedus::ErrorStackBatch::emprace_back(), foedus::memory::PagePoolOffsetAndEpochChunk::empty(), engine_, foedus::Engine::get_memory_manager(), foedus::memory::EngineMemory::get_node_memory(), foedus::memory::NumaCoreMemory::get_retired_volatile_pool_chunk(), foedus::Engine::get_soc_count(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), id_, foedus::thread::kTerminated, foedus::thread::kWaitingForTerminate, log_buffer_, node_memory_, raw_thread_, foedus::memory::PagePool::release(), foedus::soc::SharedPolling::signal(), foedus::memory::PagePoolOffsetAndEpochChunk::size(), snapshot_cache_hashtable_, snapshot_file_set_, foedus::thread::ThreadControlBlock::status_, SUMMARIZE_ERROR_BATCH, foedus::thread::ThreadControlBlock::uninitialize(), foedus::DefaultInitializable::uninitialize(), and foedus::thread::ThreadControlBlock::wakeup_cond_.

122  {
123  ErrorStackBatch batch;
124  {
125  {
128  }
129  LOG(INFO) << "Thread-" << id_ << " requested to terminate";
130  if (raw_thread_.joinable()) {
131  raw_thread_.join();
132  }
134  }
135  {
136  // release retired volatile pages. we do this in thread module rather than in memory module
137  // because this has to happen before NumaNodeMemory of any node is uninitialized.
138  for (uint16_t node = 0; node < engine_->get_soc_count(); ++node) {
139  memory::PagePoolOffsetAndEpochChunk* chunk
141  memory::PagePool* volatile_pool
143  if (!chunk->empty()) {
144  volatile_pool->release(chunk->size(), chunk);
145  }
146  ASSERT_ND(chunk->empty());
147  }
148  }
149  batch.emprace_back(snapshot_file_set_.uninitialize());
150  batch.emprace_back(log_buffer_.uninitialize());
151  core_memory_ = nullptr;
152  node_memory_ = nullptr;
153  snapshot_cache_hashtable_ = nullptr;
155  return SUMMARIZE_ERROR_BATCH(batch);
156 }
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
memory::NumaNodeMemory * node_memory_
same above
std::thread raw_thread_
Encapsulates raw thread object.
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
soc::SharedPolling wakeup_cond_
The thread sleeps on this conditional when it has no task.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
The thread is requested to terminate.
Definition: thread_id.hpp:215
Engine *const engine_
MCS locks methods.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
Definition: page_pool.cpp:134
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
PagePoolOffsetAndEpochChunk * get_retired_volatile_pool_chunk(uint16_t node)
const ThreadId id_
Unique ID of this thread.
ThreadStatus status_
Impersonation status of this thread.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
The thread has terminated (either error or normal, check the result to differentiate them)...
Definition: thread_id.hpp:217
cache::CacheHashtable * snapshot_cache_hashtable_
same above
void signal()
Signal it to let waiters exit.
ThreadControlBlock * control_block_

Here is the call graph for this function:

Friends And Related Function Documentation

template<typename RW_BLOCK >
friend class ThreadPimplMcsAdaptor
friend

Definition at line 161 of file thread_pimpl.hpp.

Member Data Documentation

xct::RwLockableXctId* foedus::thread::ThreadPimpl::canonical_address_

Definition at line 392 of file thread_pimpl.hpp.

memory::NumaCoreMemory* foedus::thread::ThreadPimpl::core_memory_
const ThreadGlobalOrdinal foedus::thread::ThreadPimpl::global_ordinal_

globally and contiguously numbered ID of thread

Definition at line 336 of file thread_pimpl.hpp.

Referenced by foedus::thread::Thread::get_thread_global_ordinal().

Thread* const foedus::thread::ThreadPimpl::holder_

The public object that holds this pimpl object.

Definition at line 325 of file thread_pimpl.hpp.

Referenced by follow_page_pointer(), follow_page_pointers_for_read_batch(), follow_page_pointers_for_write_batch(), handle_tasks(), and on_snapshot_cache_miss().

memory::LocalPageResolver foedus::thread::ThreadPimpl::local_volatile_page_resolver_
log::ThreadLogBuffer foedus::thread::ThreadPimpl::log_buffer_

Thread-private log buffer.

Definition at line 361 of file thread_pimpl.hpp.

Referenced by foedus::thread::Thread::get_thread_log_buffer(), initialize_once(), and uninitialize_once().

xct::McsRwExtendedBlock* foedus::thread::ThreadPimpl::mcs_rw_extended_blocks_
xct::McsRwSimpleBlock* foedus::thread::ThreadPimpl::mcs_rw_simple_blocks_
xct::McsWwBlock* foedus::thread::ThreadPimpl::mcs_ww_blocks_

Pre-allocated MCS blocks.

index 0 is not used so that successor_block=0 means null.

Definition at line 387 of file thread_pimpl.hpp.

Referenced by foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_ww_my_block(), and initialize_once().

memory::NumaNodeMemory* foedus::thread::ThreadPimpl::node_memory_

same above

Definition at line 347 of file thread_pimpl.hpp.

Referenced by initialize_once(), and uninitialize_once().

std::thread foedus::thread::ThreadPimpl::raw_thread_

Encapsulates raw thread object.

Definition at line 366 of file thread_pimpl.hpp.

Referenced by initialize_once(), set_thread_schedule(), and uninitialize_once().

std::atomic<bool> foedus::thread::ThreadPimpl::raw_thread_set_

Just to make sure raw_thread_ is set.

Otherwise pthread_getschedparam will complain.

Definition at line 368 of file thread_pimpl.hpp.

Referenced by initialize_once(), and set_thread_schedule().

bool foedus::thread::ThreadPimpl::simple_mcs_rw_

shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple

Definition at line 338 of file thread_pimpl.hpp.

Referenced by initialize_once(), and is_simple_mcs_rw().

cache::CacheHashtable* foedus::thread::ThreadPimpl::snapshot_cache_hashtable_
cache::SnapshotFileSet foedus::thread::ThreadPimpl::snapshot_file_set_

Each threads maintains a private set of snapshot file descriptors.

Definition at line 380 of file thread_pimpl.hpp.

Referenced by initialize_once(), read_a_snapshot_page(), read_snapshot_pages(), and uninitialize_once().

memory::PagePool* foedus::thread::ThreadPimpl::snapshot_page_pool_

shorthand for node_memory_->get_snapshot_pool()

Definition at line 351 of file thread_pimpl.hpp.

Referenced by find_or_read_a_snapshot_page(), find_or_read_snapshot_pages_batch(), initialize_once(), and on_snapshot_cache_miss().

void* foedus::thread::ThreadPimpl::task_input_memory_

Definition at line 383 of file thread_pimpl.hpp.

Referenced by handle_tasks(), and initialize_once().

void* foedus::thread::ThreadPimpl::task_output_memory_

Definition at line 384 of file thread_pimpl.hpp.

Referenced by handle_tasks(), and initialize_once().


The documentation for this class was generated from the following files: