libfoedus-core
FOEDUS Core Library
sequential_storage_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 
26 #include "foedus/engine.hpp"
30 #include "foedus/log/log_type.hpp"
46 #include "foedus/thread/thread.hpp"
47 #include "foedus/xct/xct.hpp"
49 
50 namespace foedus {
51 namespace storage {
52 namespace sequential {
53 
54 // Defines SequentialStorage methods so that we can inline implementation calls
55 
57  thread::Thread* context,
58  const void *payload,
59  uint16_t payload_count) {
60  if (payload_count >= kMaxPayload) {
62  }
63 
64  // Sequential storage doesn't need to check its current state for appends.
65  // we are sure we can append it anyways, so we just create a log record.
66  uint16_t log_length = SequentialAppendLogType::calculate_log_length(payload_count);
67  SequentialAppendLogType* log_entry = reinterpret_cast<SequentialAppendLogType*>(
68  context->get_thread_log_buffer().reserve_new_log(log_length));
69  log_entry->populate(get_id(), payload, payload_count);
70 
71  // also, we don't have to take a lock while commit because our SequentialVolatileList is
72  // lock-free. So, we maintain a special lock-free write-set for sequential storage.
73  return context->get_current_xct().add_to_lock_free_write_set(get_id(), log_entry);
74 }
75 
77  thread::Thread* context,
78  const SequentialAppendLogType* log_entry) {
80  context,
81  log_entry->header_.xct_id_,
82  log_entry->payload_,
83  log_entry->payload_count_);
84 }
85 
87  LOG(INFO) << "Uninitializing an sequential-storage " << get_name();
88  // release all pages in this list.
89  uint16_t nodes = engine_->get_options().thread_.group_count_;
90  uint16_t threads_per_node = engine_->get_options().thread_.thread_count_per_group_;
91  for (uint16_t node = 0; node < nodes; ++node) {
92  // we are sure these pages are from only one NUMA node, so we can easily batch-return.
94  memory::PagePool* pool = memory->get_volatile_pool();
95  const memory::LocalPageResolver& resolver = pool->get_resolver();
97  for (uint16_t local_ordinal = 0; local_ordinal < threads_per_node; ++local_ordinal) {
98  thread::ThreadId thread_id = thread::compose_thread_id(node, local_ordinal);
99  for (SequentialPage* page = get_head(resolver, thread_id); page;) {
100  ASSERT_ND(page->header().page_id_);
101  VolatilePagePointer cur_pointer;
102  cur_pointer.word = page->header().page_id_;
103  ASSERT_ND(page == reinterpret_cast<SequentialPage*>(resolver.resolve_offset(
104  cur_pointer.get_offset())));
105  ASSERT_ND(node == cur_pointer.get_numa_node());
106  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
107  if (chunk.full()) {
108  pool->release(chunk.size(), &chunk);
109  }
110  ASSERT_ND(!chunk.full());
111  chunk.push_back(cur_pointer.get_offset());
112 
113  if (!next_pointer.is_null()) {
114  ASSERT_ND(node == next_pointer.get_numa_node());
115  page = reinterpret_cast<SequentialPage*>(resolver.resolve_offset(
116  next_pointer.get_offset()));
117  } else {
118  page = nullptr;
119  }
120  }
121  }
122  if (chunk.size() > 0) {
123  pool->release(chunk.size(), &chunk);
124  }
125  ASSERT_ND(chunk.size() == 0);
126  }
127 
128  // release pointer pages
129  for (uint16_t p = 0; p * 4 < nodes; ++p) {
130  uint16_t node = p * 4;
132  memory::PagePool* pool = memory->get_volatile_pool();
133  if (!control_block_->head_pointer_pages_[p].is_null()) {
134  ASSERT_ND(control_block_->head_pointer_pages_[p].get_numa_node() == node);
135  pool->release_one(control_block_->head_pointer_pages_[p].get_offset());
136  }
137  if (!control_block_->tail_pointer_pages_[p].is_null()) {
138  ASSERT_ND(control_block_->tail_pointer_pages_[p].get_numa_node() == node);
139  pool->release_one(control_block_->tail_pointer_pages_[p].get_offset());
140  }
141  }
142  std::memset(control_block_->head_pointer_pages_, 0, sizeof(control_block_->head_pointer_pages_));
143  std::memset(control_block_->tail_pointer_pages_, 0, sizeof(control_block_->tail_pointer_pages_));
144  return kRetOk;
145 }
146 
148  if (exists()) {
149  LOG(ERROR) << "This sequential-storage already exists: " << get_name();
151  }
152 
153  control_block_->meta_ = metadata;
154  const Epoch initial_truncate_epoch = engine_->get_earliest_epoch();
155  control_block_->meta_.truncate_epoch_ = initial_truncate_epoch.value();
156  control_block_->cur_truncate_epoch_.store(initial_truncate_epoch.value());
157  control_block_->cur_truncate_epoch_tid_.reset();
158  control_block_->cur_truncate_epoch_tid_.xct_id_.set_epoch(initial_truncate_epoch);
159  control_block_->cur_truncate_epoch_tid_.xct_id_.set_ordinal(1);
160 
162  control_block_->status_ = kExists;
163  LOG(INFO) << "Newly created a sequential-storage " << get_name();
164  return kRetOk;
165 }
167  // for sequential storage, whether the snapshot root pointer is null or not doesn't matter.
168  // essentially load==create, except that it just sets the snapshot root pointer.
169  control_block_->meta_ = static_cast<const SequentialMetadata&>(snapshot_block.meta_);
170  Epoch initial_truncate_epoch(control_block_->meta_.truncate_epoch_);
171  if (!initial_truncate_epoch.is_valid()) {
172  initial_truncate_epoch = engine_->get_earliest_epoch();
173  }
174  ASSERT_ND(initial_truncate_epoch.is_valid());
175  control_block_->meta_.truncate_epoch_ = initial_truncate_epoch.value();
176  control_block_->cur_truncate_epoch_.store(initial_truncate_epoch.value());
177  control_block_->cur_truncate_epoch_tid_.reset();
178  control_block_->cur_truncate_epoch_tid_.xct_id_.set_epoch(initial_truncate_epoch);
179  control_block_->cur_truncate_epoch_tid_.xct_id_.set_ordinal(1);
180 
182  control_block_->root_page_pointer_.snapshot_pointer_
183  = control_block_->meta_.root_snapshot_page_id_;
184  control_block_->status_ = kExists;
185  LOG(INFO) << "Loaded a sequential-storage " << get_name();
186  return kRetOk;
187 }
189  std::memset(control_block_->head_pointer_pages_, 0, sizeof(control_block_->head_pointer_pages_));
190  std::memset(control_block_->tail_pointer_pages_, 0, sizeof(control_block_->tail_pointer_pages_));
191  // we pre-allocate pointer pages for all required nodes.
192  // 2^10 pointers (threads) per page : 4 nodes per page
193 
194  uint32_t nodes = engine_->get_options().thread_.group_count_;
195  for (uint16_t p = 0; p * 4 < nodes; ++p) {
196  uint16_t node = p * 4;
198  memory::PagePool* pool = memory->get_volatile_pool();
199  memory::PagePoolOffset head_offset, tail_offset;
200  // minor todo: gracefully fail in case of out of memory
201  WRAP_ERROR_CODE(pool->grab_one(&head_offset));
202  WRAP_ERROR_CODE(pool->grab_one(&tail_offset));
203  control_block_->head_pointer_pages_[p].set(node, head_offset);
204  control_block_->tail_pointer_pages_[p].set(node, tail_offset);
205  void* head_page = pool->get_resolver().resolve_offset_newpage(head_offset);
206  void* tail_page = pool->get_resolver().resolve_offset_newpage(tail_offset);
207  std::memset(head_page, 0, kPageSize);
208  std::memset(tail_page, 0, kPageSize);
209  }
210  return kRetOk;
211 }
212 
214  thread::Thread* context,
215  Epoch* out) const {
216  xct::Xct& cur_xct = context->get_current_xct();
217  if (!cur_xct.is_active()) {
218  return kErrorCodeXctNoXct;
219  }
220 
221  auto* address = &cur_truncate_epoch_tid_;
222  xct::XctId observed = address->xct_id_;
223  while (UNLIKELY(observed.is_being_written())) {
225  observed = address->xct_id_;
226  }
227 
228  *out = Epoch(cur_truncate_epoch_.load()); // atomic!
230  meta_.id_,
231  observed,
232  const_cast< xct::RwLockableXctId* >(address))); // why it doesn't receive const? I forgot..
233  return kErrorCodeOk;
234 }
235 
236 ErrorStack SequentialStoragePimpl::truncate(Epoch new_truncate_epoch, Epoch* commit_epoch) {
237  LOG(INFO) << "Truncating " << get_name() << " upto Epoch " << new_truncate_epoch
238  << ". old value=" << control_block_->cur_truncate_epoch_;
239  if (!new_truncate_epoch.is_valid()) {
240  LOG(ERROR) << "truncate() was called with an invalid epoch";
242  } else if (new_truncate_epoch < engine_->get_earliest_epoch()) {
243  LOG(ERROR) << "too-old epoch for this system. " << new_truncate_epoch;
245  }
246 
247  // will check this again after locking.
248  if (Epoch(control_block_->cur_truncate_epoch_) >= new_truncate_epoch) {
249  LOG(INFO) << "Already truncated up to " << Epoch(control_block_->cur_truncate_epoch_)
250  << ". Requested = " << new_truncate_epoch;
251  return kRetOk;
252  }
253 
254  if (new_truncate_epoch > engine_->get_current_global_epoch()) {
255  LOG(WARNING) << "Ohh? we don't prohibit it, but are you sure? Truncating up to a future"
256  << " epoch-" << new_truncate_epoch << ". cur_global=" << engine_->get_current_global_epoch();
257  }
258 
259 
260  // We lock it first so that there are no concurrent truncate.
261  {
262  // TODO(Hideaki) Ownerless-lock here
263  // xct::McsOwnerlessLockScope scope(&control_block_->cur_truncate_epoch_tid_);
264 
265  if (Epoch(control_block_->cur_truncate_epoch_) >= new_truncate_epoch) {
266  LOG(INFO) << "Already truncated up to " << Epoch(control_block_->cur_truncate_epoch_)
267  << ". Requested = " << new_truncate_epoch;
268  return kRetOk;
269  }
270 
271  // First, let scanner know that something is happening.
272  // 1. Scanners that didn't observe this and commit before us: fine.
273  // 2. Scanners that didn't observe this and commit after us:
274  // will see this flag and abort in precommit, fine.
275  // 3. Scanners that observe this:
276  // spin until we are done, fine (see optimistic_read_truncate_epoch()).
277  // NOTE: Below, we must NOT have any error-return path. Otherwise being_written state is left.
278  control_block_->cur_truncate_epoch_tid_.xct_id_.set_being_written();
280 
281  // Log this operation as a metadata operation. We get a commit_epoch here.
282  {
283  char log_buffer[sizeof(SequentialTruncateLogType)];
284  std::memset(log_buffer, 0, sizeof(log_buffer));
285  SequentialTruncateLogType* the_log = reinterpret_cast<SequentialTruncateLogType*>(log_buffer);
286  the_log->header_.storage_id_ = get_id();
287  the_log->header_.log_type_code_ = log::get_log_code<SequentialTruncateLogType>();
288  the_log->header_.log_length_ = sizeof(SequentialTruncateLogType);
289  the_log->new_truncate_epoch_ = new_truncate_epoch;
290  engine_->get_log_manager()->get_meta_buffer()->commit(the_log, commit_epoch);
291  }
292 
293  // Then, apply it. This also clears the being_written flag
294  {
295  xct::XctId xct_id;
296  xct_id.set(commit_epoch->value(), 1); // no dependency, so minimal ordinal is always correct
297  control_block_->cur_truncate_epoch_.store(new_truncate_epoch.value()); // atomic!
298  control_block_->cur_truncate_epoch_tid_.xct_id_ = xct_id;
300 
301  // Also set to the metadata to make this permanent.
302  // The metadata will be written out in next snapshot.
303  // Until that, REDO-operation below will re-apply that after crash.
304  control_block_->meta_.truncate_epoch_ = new_truncate_epoch.value();
306  }
307  }
308 
309  LOG(INFO) << "Truncated";
310  return kRetOk;
311 }
312 
314  // this method is called only during restart, so no race.
315  ASSERT_ND(control_block_->exists());
316  control_block_->cur_truncate_epoch_tid_.xct_id_ = the_log.header_.xct_id_;
317  control_block_->cur_truncate_epoch_.store(the_log.new_truncate_epoch_.value());
318  control_block_->meta_.truncate_epoch_ = the_log.new_truncate_epoch_.value();
319  LOG(INFO) << "Applied redo-log of truncation on sequential storage- " << get_name()
320  << " epoch=" << control_block_->cur_truncate_epoch_;
321 }
322 
324  thread::Thread* context,
325  xct::XctId owner_id,
326  const void* payload,
327  uint16_t payload_count) {
328  thread::ThreadId thread_id = context->get_thread_id();
329  thread::ThreadGroupId node = context->get_numa_node();
330 
331  // the list is local to this core, so no race possible EXCEPT scanning thread
332  // and snapshot thread, but they are read-only or only dropping pages.
333  memory::PagePoolOffset* tail_pointer = get_tail_pointer(thread_id);
334  ASSERT_ND(tail_pointer);
335  SequentialPage* tail = nullptr;
336  if (*tail_pointer != 0) {
337  tail = reinterpret_cast<SequentialPage*>(
338  context->get_local_volatile_page_resolver().resolve_offset(*tail_pointer));
339  }
340  if (tail == nullptr ||
341  !tail->can_insert_record(payload_count) ||
342  // note: we make sure no volatile page has records from two epochs.
343  // this makes us easy to drop volatile pages after snapshotting.
344  (tail->get_record_count() > 0 && tail->get_first_record_epoch() != owner_id.get_epoch())) {
345  memory::PagePoolOffset new_page_offset
347  if (UNLIKELY(new_page_offset == 0)) {
348  LOG(FATAL) << " Unexpected error. we ran out of free page while inserting to sequential"
349  " storage after commit.";
350  }
351  VolatilePagePointer new_page_pointer;
352  new_page_pointer.set(node, new_page_offset);
353  SequentialPage* new_page = reinterpret_cast<SequentialPage*>(
354  context->get_local_volatile_page_resolver().resolve_offset_newpage(new_page_offset));
355  new_page->initialize_volatile_page(get_id(), new_page_pointer);
356 
357  if (tail == nullptr) {
358  // this is the first access to this head pointer. Let's install the first page.
359  ASSERT_ND(*tail_pointer == 0);
360  memory::PagePoolOffset* head_pointer = get_head_pointer(thread_id);
361  ASSERT_ND(*head_pointer == 0);
362  *head_pointer = new_page_offset;
363  *tail_pointer = new_page_offset;
364  } else {
365  ASSERT_ND(*get_head_pointer(thread_id) != 0);
366  *tail_pointer = new_page_offset;
367  tail->next_page().volatile_pointer_ = new_page_pointer;
368  }
369  tail = new_page;
370  }
371 
372  ASSERT_ND(tail &&
373  tail->can_insert_record(payload_count) &&
374  (tail->get_record_count() == 0 || tail->get_first_record_epoch() == owner_id.get_epoch()));
375  tail->append_record_nosync(owner_id, payload_count, payload);
376 }
377 
382  uint16_t page;
383  uint16_t index;
384  get_pointer_page_and_index(thread_id, &page, &index);
385  const memory::GlobalVolatilePageResolver& resolver
387  PointerPage* head_page
388  = reinterpret_cast<PointerPage*>(resolver.resolve_offset_newpage(
389  control_block_->head_pointer_pages_[page]));
390  return head_page->pointers_ + index;
391 }
396  uint16_t page;
397  uint16_t index;
398  get_pointer_page_and_index(thread_id, &page, &index);
399  const memory::GlobalVolatilePageResolver& resolver
401  PointerPage* tail_page
402  = reinterpret_cast<PointerPage*>(resolver.resolve_offset_newpage(
403  control_block_->tail_pointer_pages_[page]));
404  return tail_page->pointers_ + index;
405 }
406 
408  const memory::LocalPageResolver& resolver,
409  thread::ThreadId thread_id) const {
410  memory::PagePoolOffset offset = *get_head_pointer(thread_id);
411  if (offset == 0) {
412  return nullptr;
413  }
414  return reinterpret_cast<SequentialPage*>(resolver.resolve_offset(offset));
415 }
416 
418  const memory::LocalPageResolver& resolver,
419  thread::ThreadId thread_id) const {
420  memory::PagePoolOffset offset = *get_tail_pointer(thread_id);
421  if (offset == 0) {
422  return nullptr;
423  }
424  return reinterpret_cast<SequentialPage*>(resolver.resolve_offset(offset));
425 }
426 
427 } // namespace sequential
428 } // namespace storage
429 } // namespace foedus
MetaLogBuffer * get_meta_buffer()
Definition: log_manager.cpp:61
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
ErrorCode append_record(thread::Thread *context, const void *payload, uint16_t payload_count)
Append one record to this sequential storage.
memory::NumaCoreMemory * get_thread_memory() const
Returns the private memory repository of this thread.
Definition: thread.cpp:57
void append_record(thread::Thread *context, xct::XctId owner_id, const void *payload, uint16_t payload_count)
Appends an already-commited record to this volatile list.
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
Lock-free list of records stored in the volatile part of sequential storage.
ErrorStack load(const StorageControlBlock &snapshot_block)
void populate(StorageId storage_id, const void *payload, uint16_t payload_count) __attribute__((always_inline))
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
Definition: page_pool.hpp:173
std::atomic< Epoch::EpochInteger > cur_truncate_epoch_
The min epoch value (truncate-epoch) for all valid records in this storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
bool can_insert_record(uint16_t payload_length) const
Returns if this page has enough room to insert a record with the given payload length.
ErrorCode grab_one(PagePoolOffset *offset)
Grab only one page.
Definition: page_pool.cpp:132
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
Epoch get_first_record_epoch() const
Returns the epoch of the fist record in this page (undefined behavior if no record).
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
ThreadId get_thread_id() const
Definition: thread.cpp:53
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:139
xct::RwLockableXctId cur_truncate_epoch_tid_
Protects accesses to cur_truncate_epoch_.
void release_one(PagePoolOffset offset)
Returns only one page.
Definition: page_pool.cpp:144
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Represents a user transaction.
Definition: xct.hpp:58
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
SequentialPage * get_tail(const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
void append_record_nosync(xct::XctId owner_id, uint16_t payload_length, const void *payload)
Appends a record to this page.
Represents a time epoch.
Definition: epoch.hpp:61
bool is_active() const
Returns whether the object is an active transaction.
Definition: xct.hpp:121
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
The storage has been created and ready for use.
Definition: storage_id.hpp:158
Definitions of IDs in this package and a few related constant values.
Represents one data page in Sequential Storage.
0x0808 : "STORAGE: Payload of the record is too long" .
Definition: error_code.hpp:174
ErrorCode add_to_lock_free_read_set(storage::StorageId storage_id, XctId observed_owner_id, RwLockableXctId *owner_id_address)
Add the given record to the special read-set that is not placed in usual data pages.
Definition: xct.cpp:532
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
A view of NumaNodeMemory for other SOCs and master engine.
void set(Epoch::EpochInteger epoch_int, uint32_t ordinal)
Definition: xct_id.hpp:958
void get_pointer_page_and_index(uint16_t thread_id, uint16_t *page, uint16_t *index)
Calculate the page/index of the thread-private head/tail pointer.
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
Definition: thread.cpp:78
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
0 means no-error.
Definition: error_code.hpp:87
ErrorStack truncate(Epoch new_truncate_epoch, Epoch *commit_epoch)
ErrorCode optimistic_read_truncate_epoch(thread::Thread *context, Epoch *out) const
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
ErrorCode add_to_lock_free_write_set(storage::StorageId storage_id, log::RecordLogType *log_entry)
Add the given log to the lock-free write set of this transaction.
Definition: xct.cpp:551
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
0x0802 : "STORAGE: This storage already exists" .
Definition: error_code.hpp:168
void apply_truncate(const SequentialTruncateLogType &the_log)
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
void set(uint8_t numa_node, memory::PagePoolOffset offset)
Definition: storage_id.hpp:212
uint16_t log_type_code_
Actually of LogCode defined in the X-Macro, but we want to make sure the type size is 2 bytes...
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
StorageId get_id() const
Returns the unique ID of this storage.
Definition: storage.hpp:145
Definitions of IDs in this package and a few related constant values.
uint16_t log_length_
Byte size of this log entry including this header itself and everything.
uint16_t group_count_
Number of ThreadGroup in the engine.
const memory::LocalPageResolver & get_local_volatile_page_resolver() const
Returns page resolver to convert only local page ID to page pointer.
Definition: thread.cpp:80
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
Definition: page_pool.cpp:134
static uint16_t calculate_log_length(uint16_t payload_count) __attribute__((always_inline))
storage::Page * resolve_offset(PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset in this pool to storage::Page*.
memory::PagePoolOffset * get_head_pointer(thread::ThreadId thread_id) const
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
storage::Page * resolve_offset_newpage(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
bool is_valid() const
Definition: epoch.hpp:96
void push_back(PagePoolOffset pointer)
Definition: page_pool.hpp:68
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
thread::ThreadOptions thread_
Declares all log types used in this storage type.
To reduce the overhead of grabbing/releasing pages from pool, we pack this many pointers for each gra...
Definition: page_pool.hpp:47
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
SequentialPage * get_head(const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
void commit(BaseLogType *metalog, Epoch *commit_epoch)
Synchronously writes out the given log to metadata log file.
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
uint16_t get_record_count() const
Returns how many records in this page placed so far.
const ErrorStack kRetOk
Normal return value for no-error case.
Resolves an offset in local (same NUMA node) page pool to a pointer and vice versa.
Log type of TRUNCATE SEQUENTIAL STORAGE operation.
ErrorStack create(const SequentialMetadata &metadata)
ThreadGroupId get_numa_node() const
Definition: thread.hpp:66
void apply_append_record(thread::Thread *context, const SequentialAppendLogType *log_entry)
Used to apply the effect of appending to volatile list.
storage::StorageId storage_id_
The storage this loggable operation mainly affects.
bool is_being_written() const __attribute__((always_inline))
Definition: xct_id.hpp:1038
const uint16_t kMaxPayload
Payload must be shorter than this length.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
Log type of sequential-storage's append operation.
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
0x0A04 : "XCTION : This thread is not running any transaction." .
Definition: error_code.hpp:199
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Epoch get_earliest_epoch() const
Returns the Earliest-Epoch, the minimum epoch that is valid within this engine.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
A base layout of shared data for all storage types.
Definition: storage.hpp:53
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
StorageId id_
the unique ID of this storage.
Definition: metadata.hpp:103
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
memory::PagePoolOffset * get_tail_pointer(thread::ThreadId thread_id) const
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id)
Called only when this page is initialized.