libfoedus-core
FOEDUS Core Library
thread_ref.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <ostream>
23 
24 #include "foedus/engine.hpp"
32 
33 namespace foedus {
34 namespace thread {
35 
37  : engine_(nullptr),
38  id_(0),
39  control_block_(nullptr),
40  task_input_memory_(nullptr),
41  task_output_memory_(nullptr),
42  mcs_ww_blocks_(nullptr),
43  mcs_rw_simple_blocks_(nullptr),
44  mcs_rw_extended_blocks_(nullptr),
45  mcs_rw_async_mappings_(nullptr) {}
46 
47 ThreadRef::ThreadRef(Engine* engine, ThreadId id) : engine_(engine), id_(id) {
49  soc::ThreadMemoryAnchors* anchors = memory_repo->get_thread_memory_anchors(id);
50  control_block_ = anchors->thread_memory_;
51  task_input_memory_ = anchors->task_input_memory_;
52  task_output_memory_ = anchors->task_output_memory_;
53  mcs_ww_blocks_ = anchors->mcs_ww_lock_memories_;
54  mcs_rw_simple_blocks_ = anchors->mcs_rw_simple_lock_memories_;
55  mcs_rw_extended_blocks_ = anchors->mcs_rw_extended_lock_memories_;
56  mcs_rw_async_mappings_ = anchors->mcs_rw_async_mappings_memories_;
57 }
58 
60  const proc::ProcName& proc_name,
61  const void* task_input,
62  uint64_t task_input_size,
63  ImpersonateSession *session) {
64  if (session->is_valid()) {
65  LOG(WARNING) << "This session is already attached to some thread. Releasing the current one..";
66  session->release();
67  }
68  if (UNLIKELY(control_block_->status_ == kNotInitialized)) {
69  // The worker thread has not started working.
70  // In this case, wait until it's initialized.
71  while (control_block_->status_ == kNotInitialized) {
72  std::this_thread::sleep_for(std::chrono::milliseconds(1));
74  }
75  }
76  if (control_block_->status_ != kWaitingForTask) {
77  DVLOG(0) << "(fast path) Someone already took Thread-" << id_ << ".";
78  return false;
79  }
80 
81  {
82  // now, check it and grab it with mutex
83  soc::SharedMutexScope scope(&control_block_->task_mutex_);
84  if (control_block_->status_ != kWaitingForTask) {
85  DVLOG(0) << "(slow path) Someone already took Thread-" << id_ << ".";
86  return false;
87  }
88  session->thread_ = this;
89  session->ticket_ = ++control_block_->current_ticket_;
90  control_block_->proc_name_ = proc_name;
91  control_block_->status_ = kWaitingForExecution;
92  control_block_->input_len_ = task_input_size;
93  if (task_input_size > 0) {
94  std::memcpy(task_input_memory_, task_input, task_input_size);
95  }
96  }
97  // waking up doesn't need mutex
98  control_block_->wakeup_cond_.signal();
99  VLOG(0) << "Impersonation succeeded for Thread-" << id_ << ".";
100  return true;
101 }
102 
103 ThreadGroupRef::ThreadGroupRef() : engine_(nullptr), group_id_(0) {
104 }
105 
107  : engine_(engine), group_id_(group_id) {
108  uint16_t count = engine->get_options().thread_.thread_count_per_group_;
109  for (uint16_t i = 0; i < count; ++i) {
110  threads_.emplace_back(ThreadRef(engine, compose_thread_id(group_id, i)));
111  }
112 }
113 
116  return control_block_->in_commit_epoch_;
117 }
118 
120  return control_block_->stat_snapshot_cache_hits_;
121 }
122 
124  return control_block_->stat_snapshot_cache_misses_;
125 }
126 
128  control_block_->stat_snapshot_cache_hits_ = 0;
129  control_block_->stat_snapshot_cache_misses_ = 0;
130 }
131 
134  Epoch ret = INVALID_EPOCH;
135  for (const auto& t : threads_) {
136  Epoch in_commit_epoch = t.get_control_block()->in_commit_epoch_;
137  if (in_commit_epoch.is_valid()) {
138  if (!ret.is_valid()) {
139  ret = in_commit_epoch;
140  } else {
141  ret.store_min(in_commit_epoch);
142  }
143  }
144  }
145 
146  return ret;
147 }
148 
150  uint32_t nmappings = control_block_->mcs_rw_async_mapping_current_;
151  for (uint32_t i = 0; i < nmappings; ++i) {
152  if (mcs_rw_async_mappings_[i].lock_id_ == lock_id) {
153  return mcs_rw_async_mappings_ + i;
154  }
155  }
156  ASSERT_ND(false);
157  return nullptr;
158 }
159 
160 std::ostream& operator<<(std::ostream& o, const ThreadGroupRef& v) {
161  o << "<ThreadGroupRef>";
162  o << "<group_id_>" << static_cast<int>(v.get_group_id()) << "</group_id_>";
163  o << "<threads_>";
164  for (const ThreadRef& child_thread : v.threads_) {
165  o << child_thread;
166  }
167  o << "</threads_>";
168  o << "</ThreadGroup>";
169  return o;
170 }
171 
172 std::ostream& operator<<(std::ostream& o, const ThreadRef& v) {
173  o << "ThreadRef-" << v.get_thread_id() << "[";
174  o << "status=" << (v.get_control_block()->status_);
175  o << "]";
176  return o;
177 }
178 
179 
180 } // namespace thread
181 } // namespace foedus
xct::McsRwAsyncMapping * mcs_rw_async_mappings_memories_
void release()
Releases all resources and ownerships this session has acquired.
uint64_t get_snapshot_cache_hits() const
Definition: thread_ref.cpp:119
ThreadTicket current_ticket_
The most recently issued impersonation ticket.
A view of Thread group object for other SOCs and master engine.
Definition: thread_ref.hpp:106
Idle state, receiving a new task.
Definition: thread_id.hpp:207
uint32_t input_len_
Byte size of input given to the procedure.
thread::ThreadRef * thread_
The impersonated thread.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
xct::McsRwExtendedBlock * mcs_rw_extended_lock_memories_
Typedefs of ID types used in thread package.
xct::McsRwAsyncMapping * get_mcs_rw_async_mapping(xct::UniversalLockId lock_id)
Definition: thread_ref.cpp:149
A client has set a next task.
Definition: thread_id.hpp:209
soc::SharedMutex task_mutex_
The following variables are protected by this mutex.
Represents a time epoch.
Definition: epoch.hpp:61
thread::ThreadTicket ticket_
The ticket issued as of impersonation.
Part of NodeMemoryAnchors for each thread.
void * task_input_memory_
Input buffer for an impersonated task.
A user session running on an impersonated thread.
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
Definition: xct_id.hpp:134
soc::SharedPolling wakeup_cond_
The thread sleeps on this conditional when it has no task.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
xct::McsWwBlock * mcs_ww_lock_memories_
Pre-allocated MCS block for each thread.
bool is_valid() const
Returns if the impersonation succeeded.
uint32_t mcs_rw_async_mapping_current_
How many async mappings for extended RW lock we have so far.
A view of Thread object for other SOCs and master engine.
Definition: thread_ref.hpp:39
thread::ThreadControlBlock * thread_memory_
Status and synchronization mechanism for impersonation of this thread.
ThreadControlBlock * get_control_block() const
Definition: thread_ref.hpp:71
Auto-lock scope object for SharedMutex.
xct::McsRwSimpleBlock * mcs_rw_simple_lock_memories_
proc::ProcName proc_name_
Name of the procedure to execute next.
Repository of all shared memory in one FOEDUS instance.
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
bool is_valid() const
Definition: epoch.hpp:96
bool try_impersonate(const proc::ProcName &proc_name, const void *task_input, uint64_t task_input_size, ImpersonateSession *session)
Conditionally try to occupy this thread, or impersonate.
Definition: thread_ref.cpp:59
thread::ThreadOptions thread_
const Epoch INVALID_EPOCH
A constant epoch object that represents an invalid epoch.
Definition: epoch.hpp:204
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
ThreadGroupId get_group_id() const
Definition: thread_ref.hpp:112
ThreadStatus status_
Impersonation status of this thread.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
std::ostream & operator<<(std::ostream &o, const ImpersonateSession &v)
Epoch get_min_in_commit_epoch() const
Returns the oldest in-commit epoch of the threads in this group.
Definition: thread_ref.cpp:132
Epoch get_in_commit_epoch() const
Definition: thread_ref.cpp:114
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ThreadMemoryAnchors * get_thread_memory_anchors(thread::ThreadId thread_id)
uint64_t get_snapshot_cache_misses() const
Definition: thread_ref.cpp:123
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
ThreadId get_thread_id() const
Definition: thread_ref.hpp:60
void signal()
Signal it to let waiters exit.
void reset_snapshot_cache_counts() const
Definition: thread_ref.cpp:127
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
void store_min(const Epoch &other)
Kind of std::min(this, other).
Definition: epoch.hpp:153
void * task_output_memory_
Output buffer for an impersonated task.