libfoedus-core
FOEDUS Core Library
thread_pool_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <ostream>
23 
24 #include "foedus/assert_nd.hpp"
25 #include "foedus/engine.hpp"
31 #include "foedus/thread/thread.hpp"
37 
38 namespace foedus {
39 namespace thread {
43  }
44  ASSERT_ND(groups_.empty());
45  const ThreadOptions &options = engine_->get_options().thread_;
46  for (ThreadGroupId group_id = 0; group_id < options.group_count_; ++group_id) {
47  groups_.emplace_back(ThreadGroupRef(engine_, group_id));
48  }
49 
50  if (!engine_->is_master()) {
51  // initialize local thread group object
52  soc::SocId node = engine_->get_soc_id();
53  local_group_ = new ThreadGroup(engine_, node);
55  }
56  return kRetOk;
57 }
58 
60  ErrorStackBatch batch;
63  }
64 
65  groups_.clear();
66  if (local_group_) {
69  delete local_group_;
70  local_group_ = nullptr;
71  }
72  return SUMMARIZE_ERROR_BATCH(batch);
73 }
76 }
77 
78 
80  const proc::ProcName& proc_name,
81  const void* task_input,
82  uint64_t task_input_size,
83  ImpersonateSession *session) {
84  uint16_t thread_per_group = engine_->get_options().thread_.thread_count_per_group_;
85  for (ThreadGroupRef& group : groups_) {
86  for (size_t j = 0; j < thread_per_group; ++j) {
87  ThreadRef* thread = group.get_thread(j);
88  if (thread->try_impersonate(proc_name, task_input, task_input_size, session)) {
89  return true;
90  }
91  }
92  }
93  return false;
94 }
96  ThreadGroupId node,
97  const proc::ProcName& proc_name,
98  const void* task_input,
99  uint64_t task_input_size,
100  ImpersonateSession *session) {
101  uint16_t thread_per_group = engine_->get_options().thread_.thread_count_per_group_;
102  ThreadGroupRef& group = groups_[node];
103  for (size_t j = 0; j < thread_per_group; ++j) {
104  ThreadRef* thread = group.get_thread(j);
105  if (thread->try_impersonate(proc_name, task_input, task_input_size, session)) {
106  return true;
107  }
108  }
109  return false;
110 }
112  ThreadId core,
113  const proc::ProcName& proc_name,
114  const void* task_input,
115  uint64_t task_input_size,
116  ImpersonateSession *session) {
117  ThreadRef* thread = get_thread(core);
118  return thread->try_impersonate(proc_name, task_input, task_input_size, session);
119 }
120 
121 std::ostream& operator<<(std::ostream& o, const ThreadPoolPimpl& v) {
122  o << "<ThreadPool>";
123  o << "<groups>";
124  for (const ThreadGroupRef& group : v.groups_) {
125  o << group;
126  }
127  o << "</groups>";
128  o << "</ThreadPool>";
129  return o;
130 }
131 
132 } // namespace thread
133 } // namespace foedus
Set of options about threads and thread-groups.
ThreadGroup * local_group_
Thread group of the local SOC engine.
ThreadRef * get_thread(ThreadLocalOrdinal ordinal)
Returns Thread object for the given ordinal in this group.
Definition: thread_ref.hpp:115
Pimpl object of ThreadPool.
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
A view of Thread group object for other SOCs and master engine.
Definition: thread_ref.hpp:106
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Typedefs of ID types used in thread package.
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:139
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
Definition: error_code.hpp:109
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ThreadGroupRef * get_group(ThreadGroupId numa_node)
bool impersonate(const proc::ProcName &proc_name, const void *task_input, uint64_t task_input_size, ImpersonateSession *session)
Definitions of IDs in this package and a few related constant values.
A user session running on an impersonated thread.
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
Represents a group of pre-allocated threads running in one NUMA node.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
A view of Thread object for other SOCs and master engine.
Definition: thread_ref.hpp:39
ThreadRef * get_thread(ThreadId id)
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
ErrorStack initialize_once() override
std::vector< ThreadGroupRef > groups_
List of all thread groups, one for each NUMA node in this engine.
bool impersonate_on_numa_node(ThreadGroupId node, const proc::ProcName &proc_name, const void *task_input, uint64_t task_input_size, ImpersonateSession *session)
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
uint16_t SocId
Represents an ID of an SOC, or NUMA node.
Definition: soc_id.hpp:41
bool try_impersonate(const proc::ProcName &proc_name, const void *task_input, uint64_t task_input_size, ImpersonateSession *session)
Conditionally try to occupy this thread, or impersonate.
Definition: thread_ref.cpp:59
thread::ThreadOptions thread_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
const ErrorStack kRetOk
Normal return value for no-error case.
bool impersonate_on_numa_core(ThreadId core, const proc::ProcName &proc_name, const void *task_input, uint64_t task_input_size, ImpersonateSession *session)
ErrorStack uninitialize_once() override
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
std::ostream & operator<<(std::ostream &o, const ImpersonateSession &v)
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
Definition: error_code.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
bool is_initialized() const override final
Returns whether the object has been already initialized or not.