libfoedus-core
FOEDUS Core Library
foedus::thread::ThreadRef Class Referencefinal

A view of Thread object for other SOCs and master engine. More...

Detailed Description

A view of Thread object for other SOCs and master engine.

Definition at line 39 of file thread_ref.hpp.

#include <thread_ref.hpp>

Public Member Functions

 ThreadRef ()
 
 ThreadRef (Engine *engine, ThreadId id)
 
bool try_impersonate (const proc::ProcName &proc_name, const void *task_input, uint64_t task_input_size, ImpersonateSession *session)
 Conditionally try to occupy this thread, or impersonate. More...
 
Engineget_engine () const
 
ThreadId get_thread_id () const
 
ThreadGroupId get_numa_node () const
 
void * get_task_input_memory () const
 
void * get_task_output_memory () const
 
xct::McsWwBlockget_mcs_ww_blocks () const
 
xct::McsRwSimpleBlockget_mcs_rw_simple_blocks () const
 
xct::McsRwExtendedBlockget_mcs_rw_extended_blocks () const
 
void get_mcs_rw_blocks (xct::McsRwSimpleBlock **out) const
 
void get_mcs_rw_blocks (xct::McsRwExtendedBlock **out) const
 
xct::McsRwAsyncMappingget_mcs_rw_async_mapping (xct::UniversalLockId lock_id)
 
ThreadControlBlockget_control_block () const
 
Epoch get_in_commit_epoch () const
 
uint64_t get_snapshot_cache_hits () const
 
uint64_t get_snapshot_cache_misses () const
 
void reset_snapshot_cache_counts () const
 

Friends

std::ostream & operator<< (std::ostream &o, const ThreadRef &v)
 

Constructor & Destructor Documentation

foedus::thread::ThreadRef::ThreadRef ( )

Definition at line 36 of file thread_ref.cpp.

37  : engine_(nullptr),
38  id_(0),
39  control_block_(nullptr),
40  task_input_memory_(nullptr),
41  task_output_memory_(nullptr),
42  mcs_ww_blocks_(nullptr),
43  mcs_rw_simple_blocks_(nullptr),
44  mcs_rw_extended_blocks_(nullptr),
45  mcs_rw_async_mappings_(nullptr) {}
foedus::thread::ThreadRef::ThreadRef ( Engine engine,
ThreadId  id 
)

Definition at line 47 of file thread_ref.cpp.

References foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), foedus::soc::SharedMemoryRepo::get_thread_memory_anchors(), foedus::soc::ThreadMemoryAnchors::mcs_rw_async_mappings_memories_, foedus::soc::ThreadMemoryAnchors::mcs_rw_extended_lock_memories_, foedus::soc::ThreadMemoryAnchors::mcs_rw_simple_lock_memories_, foedus::soc::ThreadMemoryAnchors::mcs_ww_lock_memories_, foedus::soc::ThreadMemoryAnchors::task_input_memory_, foedus::soc::ThreadMemoryAnchors::task_output_memory_, and foedus::soc::ThreadMemoryAnchors::thread_memory_.

47  : engine_(engine), id_(id) {
48  soc::SharedMemoryRepo* memory_repo = engine->get_soc_manager()->get_shared_memory_repo();
49  soc::ThreadMemoryAnchors* anchors = memory_repo->get_thread_memory_anchors(id);
50  control_block_ = anchors->thread_memory_;
51  task_input_memory_ = anchors->task_input_memory_;
52  task_output_memory_ = anchors->task_output_memory_;
53  mcs_ww_blocks_ = anchors->mcs_ww_lock_memories_;
54  mcs_rw_simple_blocks_ = anchors->mcs_rw_simple_lock_memories_;
55  mcs_rw_extended_blocks_ = anchors->mcs_rw_extended_lock_memories_;
56  mcs_rw_async_mappings_ = anchors->mcs_rw_async_mappings_memories_;
57 }

Here is the call graph for this function:

Member Function Documentation

Engine* foedus::thread::ThreadRef::get_engine ( ) const
inline

Definition at line 59 of file thread_ref.hpp.

59 { return engine_; }
Epoch foedus::thread::ThreadRef::get_in_commit_epoch ( ) const
See also
foedus::xct::InCommitEpochGuard

Definition at line 114 of file thread_ref.cpp.

References foedus::thread::ThreadControlBlock::in_commit_epoch_, and foedus::assorted::memory_fence_acquire().

114  {
116  return control_block_->in_commit_epoch_;
117 }
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).

Here is the call graph for this function:

xct::McsRwAsyncMapping * foedus::thread::ThreadRef::get_mcs_rw_async_mapping ( xct::UniversalLockId  lock_id)

Definition at line 149 of file thread_ref.cpp.

References ASSERT_ND, and foedus::thread::ThreadControlBlock::mcs_rw_async_mapping_current_.

Referenced by foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_rw_other_async_block().

149  {
150  uint32_t nmappings = control_block_->mcs_rw_async_mapping_current_;
151  for (uint32_t i = 0; i < nmappings; ++i) {
152  if (mcs_rw_async_mappings_[i].lock_id_ == lock_id) {
153  return mcs_rw_async_mappings_ + i;
154  }
155  }
156  ASSERT_ND(false);
157  return nullptr;
158 }
uint32_t mcs_rw_async_mapping_current_
How many async mappings for extended RW lock we have so far.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the caller graph for this function:

void foedus::thread::ThreadRef::get_mcs_rw_blocks ( xct::McsRwSimpleBlock **  out) const
inline

Definition at line 68 of file thread_ref.hpp.

Referenced by foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_rw_other_block().

68 { *out = mcs_rw_simple_blocks_; }

Here is the caller graph for this function:

void foedus::thread::ThreadRef::get_mcs_rw_blocks ( xct::McsRwExtendedBlock **  out) const
inline

Definition at line 69 of file thread_ref.hpp.

69 { *out = mcs_rw_extended_blocks_; }
xct::McsRwExtendedBlock* foedus::thread::ThreadRef::get_mcs_rw_extended_blocks ( ) const
inline

Definition at line 66 of file thread_ref.hpp.

66 { return mcs_rw_extended_blocks_; }
xct::McsRwSimpleBlock* foedus::thread::ThreadRef::get_mcs_rw_simple_blocks ( ) const
inline

Definition at line 65 of file thread_ref.hpp.

65 { return mcs_rw_simple_blocks_; }
xct::McsWwBlock* foedus::thread::ThreadRef::get_mcs_ww_blocks ( ) const
inline

Definition at line 64 of file thread_ref.hpp.

Referenced by foedus::thread::ThreadPimplMcsAdaptor< RW_BLOCK >::get_ww_other_block().

64 { return mcs_ww_blocks_; }

Here is the caller graph for this function:

ThreadGroupId foedus::thread::ThreadRef::get_numa_node ( ) const
inline

Definition at line 61 of file thread_ref.hpp.

References foedus::thread::decompose_numa_node().

61 { return decompose_numa_node(id_); }
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131

Here is the call graph for this function:

uint64_t foedus::thread::ThreadRef::get_snapshot_cache_hits ( ) const

Definition at line 119 of file thread_ref.cpp.

References foedus::thread::ThreadControlBlock::stat_snapshot_cache_hits_.

119  {
120  return control_block_->stat_snapshot_cache_hits_;
121 }
uint64_t foedus::thread::ThreadRef::get_snapshot_cache_misses ( ) const

Definition at line 123 of file thread_ref.cpp.

References foedus::thread::ThreadControlBlock::stat_snapshot_cache_misses_.

123  {
124  return control_block_->stat_snapshot_cache_misses_;
125 }
void* foedus::thread::ThreadRef::get_task_input_memory ( ) const
inline

Definition at line 62 of file thread_ref.hpp.

62 { return task_input_memory_; }
void* foedus::thread::ThreadRef::get_task_output_memory ( ) const
inline

Definition at line 63 of file thread_ref.hpp.

Referenced by foedus::thread::ImpersonateSession::get_output(), and foedus::thread::ImpersonateSession::get_raw_output_buffer().

63 { return task_output_memory_; }

Here is the caller graph for this function:

ThreadId foedus::thread::ThreadRef::get_thread_id ( ) const
inline

Definition at line 60 of file thread_ref.hpp.

Referenced by foedus::thread::operator<<().

60 { return id_; }

Here is the caller graph for this function:

void foedus::thread::ThreadRef::reset_snapshot_cache_counts ( ) const
bool foedus::thread::ThreadRef::try_impersonate ( const proc::ProcName proc_name,
const void *  task_input,
uint64_t  task_input_size,
ImpersonateSession session 
)

Conditionally try to occupy this thread, or impersonate.

If it fails, it immediately returns.

Parameters
[in]proc_namethe name of the procedure to run on this thread.
[in]task_inputinput data of arbitrary format for the procedure.
[in]task_input_sizebyte size of the input data to copy into the thread's memory.
[out]sessionthe session to run on this thread. On success, the session receives a ticket so that the caller can wait for the completion.
Returns
whether successfully impersonated.

Definition at line 59 of file thread_ref.cpp.

References foedus::thread::ThreadControlBlock::current_ticket_, foedus::thread::ThreadControlBlock::input_len_, foedus::thread::ImpersonateSession::is_valid(), foedus::thread::kNotInitialized, foedus::thread::kWaitingForExecution, foedus::thread::kWaitingForTask, foedus::assorted::memory_fence_acquire(), foedus::thread::ThreadControlBlock::proc_name_, foedus::thread::ImpersonateSession::release(), foedus::soc::SharedPolling::signal(), foedus::thread::ThreadControlBlock::status_, foedus::thread::ThreadControlBlock::task_mutex_, foedus::thread::ImpersonateSession::thread_, foedus::thread::ImpersonateSession::ticket_, UNLIKELY, and foedus::thread::ThreadControlBlock::wakeup_cond_.

Referenced by foedus::thread::ThreadPoolPimpl::impersonate(), foedus::thread::ThreadPoolPimpl::impersonate_on_numa_core(), and foedus::thread::ThreadPoolPimpl::impersonate_on_numa_node().

63  {
64  if (session->is_valid()) {
65  LOG(WARNING) << "This session is already attached to some thread. Releasing the current one..";
66  session->release();
67  }
68  if (UNLIKELY(control_block_->status_ == kNotInitialized)) {
69  // The worker thread has not started working.
70  // In this case, wait until it's initialized.
71  while (control_block_->status_ == kNotInitialized) {
72  std::this_thread::sleep_for(std::chrono::milliseconds(1));
74  }
75  }
76  if (control_block_->status_ != kWaitingForTask) {
77  DVLOG(0) << "(fast path) Someone already took Thread-" << id_ << ".";
78  return false;
79  }
80 
81  {
82  // now, check it and grab it with mutex
83  soc::SharedMutexScope scope(&control_block_->task_mutex_);
84  if (control_block_->status_ != kWaitingForTask) {
85  DVLOG(0) << "(slow path) Someone already took Thread-" << id_ << ".";
86  return false;
87  }
88  session->thread_ = this;
89  session->ticket_ = ++control_block_->current_ticket_;
90  control_block_->proc_name_ = proc_name;
91  control_block_->status_ = kWaitingForExecution;
92  control_block_->input_len_ = task_input_size;
93  if (task_input_size > 0) {
94  std::memcpy(task_input_memory_, task_input, task_input_size);
95  }
96  }
97  // waking up doesn't need mutex
98  control_block_->wakeup_cond_.signal();
99  VLOG(0) << "Impersonation succeeded for Thread-" << id_ << ".";
100  return true;
101 }
ThreadTicket current_ticket_
The most recently issued impersonation ticket.
Idle state, receiving a new task.
Definition: thread_id.hpp:207
uint32_t input_len_
Byte size of input given to the procedure.
A client has set a next task.
Definition: thread_id.hpp:209
soc::SharedMutex task_mutex_
The following variables are protected by this mutex.
soc::SharedPolling wakeup_cond_
The thread sleeps on this conditional when it has no task.
proc::ProcName proc_name_
Name of the procedure to execute next.
ThreadStatus status_
Impersonation status of this thread.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
void signal()
Signal it to let waiters exit.

Here is the call graph for this function:

Here is the caller graph for this function:

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  o,
const ThreadRef v 
)
friend

Definition at line 172 of file thread_ref.cpp.

172  {
173  o << "ThreadRef-" << v.get_thread_id() << "[";
174  o << "status=" << (v.get_control_block()->status_);
175  o << "]";
176  return o;
177 }

The documentation for this class was generated from the following files: