libfoedus-core
FOEDUS Core Library
thread.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #include "foedus/thread/thread.hpp"
19 
20 #include <glog/logging.h>
21 
22 #include <ostream>
23 
24 #include "foedus/engine.hpp"
29 
30 namespace foedus {
31 namespace thread {
33  Engine* engine,
34  ThreadId id,
35  ThreadGlobalOrdinal global_ordinal)
36  : pimpl_(nullptr) {
37  lock_rnd_.set_current_seed(global_ordinal);
38  pimpl_ = new ThreadPimpl(engine, this, id, global_ordinal);
39 }
41  delete pimpl_;
42  pimpl_ = nullptr;
43 }
44 
46  CHECK_ERROR(pimpl_->initialize());
47  return kRetOk;
48 }
49 bool Thread::is_initialized() const { return pimpl_->is_initialized(); }
51 
52 Engine* Thread::get_engine() const { return pimpl_->engine_; }
53 ThreadId Thread::get_thread_id() const { return pimpl_->id_; }
56 
59  return pimpl_->core_memory_->get_node_memory();
60 }
61 
64 }
65 
68 }
69 
73 }
74 
76 bool Thread::is_running_xct() const { return pimpl_->current_xct_.is_active(); }
77 
79 
81  return pimpl_->local_volatile_page_resolver_;
82 }
83 
86  storage::Page* buffer) {
87  return pimpl_->read_a_snapshot_page(page_id, buffer);
88 }
90  storage::SnapshotPagePointer page_id_begin,
91  uint32_t page_count,
92  storage::Page* buffer) {
93  return pimpl_->read_snapshot_pages(page_id_begin, page_count, buffer);
94 }
97  storage::Page** out) {
98  return pimpl_->find_or_read_a_snapshot_page(page_id, out);
99 }
101  uint16_t batch_size,
102  const storage::SnapshotPagePointer* page_ids,
103  storage::Page** out) {
104  return pimpl_->find_or_read_snapshot_pages_batch(batch_size, page_ids, out);
105 }
106 
108  storage::DualPagePointer* pointer,
109  storage::Page** installed_page) {
110  return pimpl_->install_a_volatile_page(pointer, installed_page);
111 }
112 
114  pimpl_->collect_retired_volatile_page(ptr);
115 }
116 
117 
118 std::ostream& operator<<(std::ostream& o, const Thread& v) {
119  o << "Thread-" << v.get_thread_global_ordinal() << "(id=" << v.get_thread_id() << ") [";
120  o << "status=" << (v.pimpl_->control_block_->status_);
121  o << "]";
122  return o;
123 }
124 
126  return pimpl_->global_volatile_page_resolver_;
127 }
128 
131 }
134 }
137 }
140 }
141 
142 bool Thread::is_hot_page(const storage::Page* page) const {
143  const uint16_t threshold = pimpl_->current_xct_.get_hot_threshold_for_this_xct();
144  return page->get_header().hotness_.value_ >= threshold;
145 }
146 
148  if (count_) {
149  release();
150  }
151  memory::NumaCoreMemory* memory = context_->get_thread_memory();
152  for (uint32_t i = 0; i < count; ++i) {
153  offsets_[i] = memory->grab_free_volatile_page();
154  if (offsets_[i] == 0) {
155  for (uint32_t j = 0; j < i; ++j) {
156  memory->release_free_volatile_page(offsets_[j]);
157  }
159  }
160  }
161  count_ = count;
162  return kErrorCodeOk;
163 }
164 
166  memory::NumaCoreMemory* memory = context_->get_thread_memory();
167  for (uint32_t i = 0; i < count_; ++i) {
168  if (offsets_[i] != 0) {
169  memory->release_free_volatile_page(offsets_[i]);
170  }
171  }
172  count_ = 0;
173 }
174 
175 } // namespace thread
176 } // namespace foedus
void collect_retired_volatile_page(storage::VolatilePagePointer ptr)
Keeps the specified volatile page as retired as of the current epoch.
const memory::GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert page ID to page pointer.
Definition: thread.cpp:125
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
Definition: thread.cpp:95
ErrorStack initialize() override
Acquires resources in this object, usually called right after constructor.
Definition: thread.cpp:45
ThreadGlobalOrdinal get_thread_global_ordinal() const
Definition: thread.cpp:54
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
Definition: thread.cpp:107
void release_free_volatile_page(PagePoolOffset offset)
Returns one free volatile page to local page pool.
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
ErrorCode find_or_read_snapshot_pages_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
Batched version of find_or_read_a_snapshot_page().
Definition: thread.cpp:100
memory::NumaCoreMemory * get_thread_memory() const
Returns the private memory repository of this thread.
Definition: thread.cpp:57
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
uint64_t get_snapshot_cache_misses() const
[statistics] count of cache misses in snapshot caches
Definition: thread.cpp:66
A thread-local log buffer.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
void collect_retired_volatile_page(storage::VolatilePagePointer ptr)
Keeps the specified volatile page as retired as of the current epoch.
Definition: thread.cpp:113
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Definition: thread.cpp:49
memory::NumaNodeMemory * get_node_memory() const
Returns the node-shared memory repository of the NUMA node this thread belongs to.
Definition: thread.cpp:58
ThreadId get_thread_id() const
Definition: thread.cpp:53
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
uint8_t value_
Log arithmic counter of aborts.
Represents a user transaction.
Definition: xct.hpp:58
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
const ThreadGlobalOrdinal global_ordinal_
globally and contiguously numbered ID of thread
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Represents a time epoch.
Definition: epoch.hpp:61
bool is_active() const
Returns whether the object is an active transaction.
Definition: xct.hpp:121
Engine * get_engine() const
Definition: thread.cpp:52
storage::Page * resolve_newpage(storage::VolatilePagePointer ptr) const
Shorthand for get_global_volatile_page_resolver.resolve_offset_newpage()
Definition: thread.cpp:132
void set_current_seed(uint64_t seed)
Epoch * get_in_commit_epoch_address()
Currently we don't have sysxct_release_locks() etc.
Definition: thread.cpp:55
Repository of memories dynamically acquired within one CPU core (thread).
Pimpl object of Thread.
ErrorCode grab(uint32_t count)
If this thread doesn't have enough free pages, no page is obtained, returning kErrorCodeMemoryNoFreeP...
Definition: thread.cpp:147
storage::Page * resolve(storage::VolatilePagePointer ptr) const
Shorthand for get_global_volatile_page_resolver.resolve_offset()
Definition: thread.cpp:129
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
Definition: thread.cpp:78
0 means no-error.
Definition: error_code.hpp:87
ErrorCode find_or_read_snapshot_pages_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
Batched version of find_or_read_a_snapshot_page().
bool is_running_xct() const
Returns if this thread is running an active transaction.
Definition: thread.cpp:76
Engine *const engine_
MCS locks methods.
NumaNodeMemory * get_node_memory() const
Returns the parent memory repository.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
uint16_t get_hot_threshold_for_this_xct() const
Definition: xct.hpp:128
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
Repository of memories dynamically acquired and shared within one NUMA node (socket).
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
void reset_snapshot_cache_counts() const
[statistics] resets the above two
Definition: thread.cpp:70
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
bool is_hot_page(const storage::Page *page) const
Definition: thread.cpp:142
const memory::LocalPageResolver & get_local_volatile_page_resolver() const
Returns page resolver to convert only local page ID to page pointer.
Definition: thread.cpp:80
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
uint64_t get_snapshot_cache_hits() const
[statistics] count of cache hits in snapshot caches
Definition: thread.cpp:62
storage::Page * resolve_offset(PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset in this pool to storage::Page*.
storage::Page * resolve_offset_newpage(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
uint16_t ThreadGlobalOrdinal
Typedef for a globally and contiguously numbered ID of thread.
Definition: thread_id.hpp:98
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
ThreadStatus status_
Impersonation status of this thread.
const ErrorStack kRetOk
Normal return value for no-error case.
Resolves an offset in local (same NUMA node) page pool to a pointer and vice versa.
std::ostream & operator<<(std::ostream &o, const ImpersonateSession &v)
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
ErrorStack uninitialize() override
An idempotent method to release all resources of this object, if any.
Definition: thread.cpp:50
assorted::ProbCounter hotness_
Loosely maintained statistics on data temperature.
Definition: page.hpp:268
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
ErrorCode read_snapshot_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, storage::Page *buffer)
Read contiguous pages in one shot.
Definition: thread.cpp:89
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
ErrorCode read_snapshot_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, storage::Page *buffer) __attribute__((always_inline))
Read contiguous pages in one shot.
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer)
Read a snapshot page using the thread-local file descriptor set.
Definition: thread.cpp:84
ThreadControlBlock * control_block_