libfoedus-core
FOEDUS Core Library
foedus::storage::sequential::SequentialStoragePimpl Class Referencefinal

Lock-free list of records stored in the volatile part of sequential storage. More...

Detailed Description

Lock-free list of records stored in the volatile part of sequential storage.

Volatile parts of sequential storages are completely separated from stable snapshot pages and maintained as a lock-free list of SequentialPage.

The append of record in page involves one atomic CAS, and append of a new page involves one atomic CAS too. Because this list is an append/scan only list, this is way simpler than typical lock-free lists.

Concurrency Control

In this volatile list, we assume the following things to simplify concurrency control:

  • Each SequentialPage contains records only in one epoch. When epoch switches, we insert new records to new page even if the page is almost vacant.
  • The order in page does not necessarily reflect serialization order. The sequential storage provides a set semantics rather than list semantics in terms of serializability although it's loosely ordered.
  • Each thread maintains its own head/tail pages so that they don't interfere each other at all. This, combined with the assumption above, makes it completely without blocking and atomic operations EXCEPT:
  • For scanning cursors (which only rarely occur and are fundamentally slow anyways), we take an exclusive lock when they touch unsafe epochs. Further, the scanning thread must wait until all other threads did NOT start before the scanning thread take lock. (otherwise serializability is not guaranteed). We will have something like Xct::InCommitEpochGuard for this purpose. As scanning threads are rare, they can wait for a while, so it's okay for other threads to complete at least one transacion before they get aware of the lock.
  • However, the above requirement is not mandatory if the scanning threads are running in snapshot mode or dirty-read mode. We just make sure that what is reads is within the epoch. Because other threads append record in serialization order to their own lists, this can be trivially achieved.

With these assumptions, sequential storages don't require any locking for serializability. Thus, we separate write-sets of sequential storages from other write-sets in transaction objects.

Page replacement policy after snapshot
This is an append-optimized storage, so there is no point to keep volatile pages for future use. We thus drop all volatile pages that were snapshot.
Note
This is a private implementation-details of Sequential Storage, thus file name ends with _impl. Do not include this header from a client program. There is no case client program needs to access this internal class.
Todo:
Implement the scanning functionality as above.

Definition at line 139 of file sequential_storage_pimpl.hpp.

#include <sequential_storage_pimpl.hpp>

Inheritance diagram for foedus::storage::sequential::SequentialStoragePimpl:
Collaboration diagram for foedus::storage::sequential::SequentialStoragePimpl:

Classes

struct  PointerPage
 

Public Member Functions

 SequentialStoragePimpl ()=delete
 
 SequentialStoragePimpl (SequentialStorage *storage)
 
 SequentialStoragePimpl (Engine *engine, SequentialStorageControlBlock *control_block)
 
bool exists () const
 
StorageId get_id () const
 
const StorageNameget_name () const
 
ErrorStack create (const SequentialMetadata &metadata)
 
ErrorStack load (const StorageControlBlock &snapshot_block)
 
ErrorStack initialize_head_tail_pages ()
 
ErrorStack drop ()
 
ErrorStack truncate (Epoch new_truncate_epoch, Epoch *commit_epoch)
 
void apply_truncate (const SequentialTruncateLogType &the_log)
 
SequentialPageget_head (const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
 
SequentialPageget_tail (const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
 
memory::PagePoolOffsetget_head_pointer (thread::ThreadId thread_id) const
 
memory::PagePoolOffsetget_tail_pointer (thread::ThreadId thread_id) const
 
void append_record (thread::Thread *context, xct::XctId owner_id, const void *payload, uint16_t payload_count)
 Appends an already-commited record to this volatile list. More...
 
template<typename HANDLER >
ErrorCode for_every_page (HANDLER handler) const
 Traverse all pages and call back the handler for every page. More...
 
- Public Member Functions inherited from foedus::Attachable< SequentialStorageControlBlock >
 Attachable ()
 
 Attachable (Engine *engine)
 
 Attachable (Engine *engine, SequentialStorageControlBlock *control_block)
 
 Attachable (SequentialStorageControlBlock *control_block)
 
 Attachable (const Attachable &other)
 
virtual ~Attachable ()
 
Attachableoperator= (const Attachable &other)
 
virtual void attach (SequentialStorageControlBlock *control_block)
 Attaches to the given shared memory. More...
 
bool is_attached () const
 Returns whether the object has been already attached to some shared memory. More...
 
SequentialStorageControlBlock * get_control_block () const
 
Engineget_engine () const
 
void set_engine (Engine *engine)
 

Additional Inherited Members

- Protected Attributes inherited from foedus::Attachable< SequentialStorageControlBlock >
Engineengine_
 Most attachable object stores an engine pointer (local engine), so we define it here. More...
 
SequentialStorageControlBlock * control_block_
 The shared data on shared memory that has been initialized in some SOC or master engine. More...
 

Constructor & Destructor Documentation

foedus::storage::sequential::SequentialStoragePimpl::SequentialStoragePimpl ( )
delete
foedus::storage::sequential::SequentialStoragePimpl::SequentialStoragePimpl ( SequentialStorage storage)
inlineexplicit

Definition at line 145 of file sequential_storage_pimpl.hpp.

146  : Attachable<SequentialStorageControlBlock>(
147  storage->get_engine(),
148  storage->get_control_block()) {
149  }
foedus::storage::sequential::SequentialStoragePimpl::SequentialStoragePimpl ( Engine engine,
SequentialStorageControlBlock control_block 
)
inline

Definition at line 150 of file sequential_storage_pimpl.hpp.

151  : Attachable<SequentialStorageControlBlock>(engine, control_block) {
152  }

Member Function Documentation

void foedus::storage::sequential::SequentialStoragePimpl::append_record ( thread::Thread context,
xct::XctId  owner_id,
const void *  payload,
uint16_t  payload_count 
)

Appends an already-commited record to this volatile list.

This method is guaranteed to succeed, so it does not return error code, which is essential to be used after commit. Actually, there is one very rare case this method might fail: we need a new page and the page pool has zero free page. However, we can trivially avoid this case by checking if we have at least one free page in thread-local cache during pre-commit.

Definition at line 323 of file sequential_storage_pimpl.cpp.

References foedus::storage::sequential::SequentialPage::append_record_nosync(), ASSERT_ND, foedus::storage::sequential::SequentialPage::can_insert_record(), foedus::xct::XctId::get_epoch(), foedus::storage::sequential::SequentialPage::get_first_record_epoch(), get_head_pointer(), get_id(), foedus::thread::Thread::get_local_volatile_page_resolver(), foedus::thread::Thread::get_numa_node(), foedus::storage::sequential::SequentialPage::get_record_count(), get_tail_pointer(), foedus::thread::Thread::get_thread_id(), foedus::thread::Thread::get_thread_memory(), foedus::memory::NumaCoreMemory::grab_free_volatile_page(), foedus::storage::sequential::SequentialPage::initialize_volatile_page(), foedus::storage::sequential::SequentialPage::next_page(), foedus::memory::LocalPageResolver::resolve_offset(), foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::storage::VolatilePagePointer::set(), UNLIKELY, and foedus::storage::DualPagePointer::volatile_pointer_.

Referenced by foedus::storage::sequential::SequentialStorage::apply_append_record().

327  {
328  thread::ThreadId thread_id = context->get_thread_id();
329  thread::ThreadGroupId node = context->get_numa_node();
330 
331  // the list is local to this core, so no race possible EXCEPT scanning thread
332  // and snapshot thread, but they are read-only or only dropping pages.
333  memory::PagePoolOffset* tail_pointer = get_tail_pointer(thread_id);
334  ASSERT_ND(tail_pointer);
335  SequentialPage* tail = nullptr;
336  if (*tail_pointer != 0) {
337  tail = reinterpret_cast<SequentialPage*>(
338  context->get_local_volatile_page_resolver().resolve_offset(*tail_pointer));
339  }
340  if (tail == nullptr ||
341  !tail->can_insert_record(payload_count) ||
342  // note: we make sure no volatile page has records from two epochs.
343  // this makes us easy to drop volatile pages after snapshotting.
344  (tail->get_record_count() > 0 && tail->get_first_record_epoch() != owner_id.get_epoch())) {
345  memory::PagePoolOffset new_page_offset
346  = context->get_thread_memory()->grab_free_volatile_page();
347  if (UNLIKELY(new_page_offset == 0)) {
348  LOG(FATAL) << " Unexpected error. we ran out of free page while inserting to sequential"
349  " storage after commit.";
350  }
351  VolatilePagePointer new_page_pointer;
352  new_page_pointer.set(node, new_page_offset);
353  SequentialPage* new_page = reinterpret_cast<SequentialPage*>(
354  context->get_local_volatile_page_resolver().resolve_offset_newpage(new_page_offset));
355  new_page->initialize_volatile_page(get_id(), new_page_pointer);
356 
357  if (tail == nullptr) {
358  // this is the first access to this head pointer. Let's install the first page.
359  ASSERT_ND(*tail_pointer == 0);
360  memory::PagePoolOffset* head_pointer = get_head_pointer(thread_id);
361  ASSERT_ND(*head_pointer == 0);
362  *head_pointer = new_page_offset;
363  *tail_pointer = new_page_offset;
364  } else {
365  ASSERT_ND(*get_head_pointer(thread_id) != 0);
366  *tail_pointer = new_page_offset;
367  tail->next_page().volatile_pointer_ = new_page_pointer;
368  }
369  tail = new_page;
370  }
371 
372  ASSERT_ND(tail &&
373  tail->can_insert_record(payload_count) &&
374  (tail->get_record_count() == 0 || tail->get_first_record_epoch() == owner_id.get_epoch()));
375  tail->append_record_nosync(owner_id, payload_count, payload);
376 }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::PagePoolOffset * get_head_pointer(thread::ThreadId thread_id) const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
memory::PagePoolOffset * get_tail_pointer(thread::ThreadId thread_id) const

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::storage::sequential::SequentialStoragePimpl::apply_truncate ( const SequentialTruncateLogType the_log)

Definition at line 313 of file sequential_storage_pimpl.cpp.

References ASSERT_ND, foedus::Attachable< SequentialStorageControlBlock >::control_block_, get_name(), foedus::log::BaseLogType::header_, foedus::storage::sequential::SequentialTruncateLogType::new_truncate_epoch_, foedus::Epoch::value(), and foedus::log::LogHeader::xct_id_.

Referenced by foedus::storage::sequential::SequentialStorage::apply_truncate().

313  {
314  // this method is called only during restart, so no race.
315  ASSERT_ND(control_block_->exists());
316  control_block_->cur_truncate_epoch_tid_.xct_id_ = the_log.header_.xct_id_;
317  control_block_->cur_truncate_epoch_.store(the_log.new_truncate_epoch_.value());
318  control_block_->meta_.truncate_epoch_ = the_log.new_truncate_epoch_.value();
319  LOG(INFO) << "Applied redo-log of truncation on sequential storage- " << get_name()
320  << " epoch=" << control_block_->cur_truncate_epoch_;
321 }
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::storage::sequential::SequentialStoragePimpl::create ( const SequentialMetadata metadata)

Definition at line 147 of file sequential_storage_pimpl.cpp.

References CHECK_ERROR, foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::Attachable< SequentialStorageControlBlock >::engine_, ERROR_STACK, exists(), foedus::Engine::get_earliest_epoch(), get_name(), initialize_head_tail_pages(), foedus::kErrorCodeStrAlreadyExists, foedus::storage::kExists, foedus::kRetOk, and foedus::Epoch::value().

Referenced by foedus::storage::sequential::SequentialStorage::create().

147  {
148  if (exists()) {
149  LOG(ERROR) << "This sequential-storage already exists: " << get_name();
151  }
152 
153  control_block_->meta_ = metadata;
154  const Epoch initial_truncate_epoch = engine_->get_earliest_epoch();
155  control_block_->meta_.truncate_epoch_ = initial_truncate_epoch.value();
156  control_block_->cur_truncate_epoch_.store(initial_truncate_epoch.value());
157  control_block_->cur_truncate_epoch_tid_.reset();
158  control_block_->cur_truncate_epoch_tid_.xct_id_.set_epoch(initial_truncate_epoch);
159  control_block_->cur_truncate_epoch_tid_.xct_id_.set_ordinal(1);
160 
162  control_block_->status_ = kExists;
163  LOG(INFO) << "Newly created a sequential-storage " << get_name();
164  return kRetOk;
165 }
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
The storage has been created and ready for use.
Definition: storage_id.hpp:158
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
0x0802 : "STORAGE: This storage already exists" .
Definition: error_code.hpp:168
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
Epoch get_earliest_epoch() const
Returns the Earliest-Epoch, the minimum epoch that is valid within this engine.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::storage::sequential::SequentialStoragePimpl::drop ( )

Definition at line 86 of file sequential_storage_pimpl.cpp.

References ASSERT_ND, foedus::thread::compose_thread_id(), foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::Attachable< SequentialStorageControlBlock >::engine_, foedus::memory::PagePoolOffsetChunk::full(), get_head(), foedus::Engine::get_memory_manager(), get_name(), foedus::memory::EngineMemory::get_node_memory(), foedus::Engine::get_options(), foedus::memory::PagePool::get_resolver(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), foedus::thread::ThreadOptions::group_count_, foedus::kRetOk, foedus::memory::PagePoolOffsetChunk::push_back(), foedus::memory::PagePool::release(), foedus::memory::PagePool::release_one(), foedus::memory::LocalPageResolver::resolve_offset(), foedus::memory::PagePoolOffsetChunk::size(), foedus::EngineOptions::thread_, foedus::thread::ThreadOptions::thread_count_per_group_, and foedus::storage::VolatilePagePointer::word.

Referenced by foedus::storage::sequential::SequentialStorage::drop().

86  {
87  LOG(INFO) << "Uninitializing an sequential-storage " << get_name();
88  // release all pages in this list.
89  uint16_t nodes = engine_->get_options().thread_.group_count_;
90  uint16_t threads_per_node = engine_->get_options().thread_.thread_count_per_group_;
91  for (uint16_t node = 0; node < nodes; ++node) {
92  // we are sure these pages are from only one NUMA node, so we can easily batch-return.
93  memory::NumaNodeMemoryRef* memory = engine_->get_memory_manager()->get_node_memory(node);
94  memory::PagePool* pool = memory->get_volatile_pool();
95  const memory::LocalPageResolver& resolver = pool->get_resolver();
96  memory::PagePoolOffsetChunk chunk;
97  for (uint16_t local_ordinal = 0; local_ordinal < threads_per_node; ++local_ordinal) {
98  thread::ThreadId thread_id = thread::compose_thread_id(node, local_ordinal);
99  for (SequentialPage* page = get_head(resolver, thread_id); page;) {
100  ASSERT_ND(page->header().page_id_);
101  VolatilePagePointer cur_pointer;
102  cur_pointer.word = page->header().page_id_;
103  ASSERT_ND(page == reinterpret_cast<SequentialPage*>(resolver.resolve_offset(
104  cur_pointer.get_offset())));
105  ASSERT_ND(node == cur_pointer.get_numa_node());
106  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
107  if (chunk.full()) {
108  pool->release(chunk.size(), &chunk);
109  }
110  ASSERT_ND(!chunk.full());
111  chunk.push_back(cur_pointer.get_offset());
112 
113  if (!next_pointer.is_null()) {
114  ASSERT_ND(node == next_pointer.get_numa_node());
115  page = reinterpret_cast<SequentialPage*>(resolver.resolve_offset(
116  next_pointer.get_offset()));
117  } else {
118  page = nullptr;
119  }
120  }
121  }
122  if (chunk.size() > 0) {
123  pool->release(chunk.size(), &chunk);
124  }
125  ASSERT_ND(chunk.size() == 0);
126  }
127 
128  // release pointer pages
129  for (uint16_t p = 0; p * 4 < nodes; ++p) {
130  uint16_t node = p * 4;
131  memory::NumaNodeMemoryRef* memory = engine_->get_memory_manager()->get_node_memory(node);
132  memory::PagePool* pool = memory->get_volatile_pool();
133  if (!control_block_->head_pointer_pages_[p].is_null()) {
134  ASSERT_ND(control_block_->head_pointer_pages_[p].get_numa_node() == node);
135  pool->release_one(control_block_->head_pointer_pages_[p].get_offset());
136  }
137  if (!control_block_->tail_pointer_pages_[p].is_null()) {
138  ASSERT_ND(control_block_->tail_pointer_pages_[p].get_numa_node() == node);
139  pool->release_one(control_block_->tail_pointer_pages_[p].get_offset());
140  }
141  }
142  std::memset(control_block_->head_pointer_pages_, 0, sizeof(control_block_->head_pointer_pages_));
143  std::memset(control_block_->tail_pointer_pages_, 0, sizeof(control_block_->tail_pointer_pages_));
144  return kRetOk;
145 }
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
uint16_t group_count_
Number of ThreadGroup in the engine.
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
thread::ThreadOptions thread_
SequentialPage * get_head(const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
const ErrorStack kRetOk
Normal return value for no-error case.
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::storage::sequential::SequentialStoragePimpl::exists ( ) const
inline

Definition at line 154 of file sequential_storage_pimpl.hpp.

References foedus::Attachable< SequentialStorageControlBlock >::control_block_.

Referenced by create().

154 { return control_block_->exists(); }
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111

Here is the caller graph for this function:

template<typename HANDLER >
ErrorCode foedus::storage::sequential::SequentialStoragePimpl::for_every_page ( HANDLER  handler) const
inline

Traverse all pages and call back the handler for every page.

Handler must look like "ErrorCode func(SequentialPage* page) { ... } ".

Definition at line 196 of file sequential_storage_pimpl.hpp.

References ASSERT_ND, CHECK_ERROR_CODE, foedus::thread::compose_thread_id(), foedus::Attachable< SequentialStorageControlBlock >::engine_, get_head(), foedus::Engine::get_memory_manager(), foedus::memory::EngineMemory::get_node_memory(), foedus::storage::VolatilePagePointer::get_offset(), foedus::Engine::get_options(), foedus::memory::PagePool::get_resolver(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), foedus::thread::ThreadOptions::group_count_, foedus::kErrorCodeOk, foedus::memory::LocalPageResolver::resolve_offset(), foedus::EngineOptions::thread_, and foedus::thread::ThreadOptions::thread_count_per_group_.

Referenced by foedus::storage::sequential::operator<<().

196  {
197  uint16_t nodes = engine_->get_options().thread_.group_count_;
198  uint16_t threads_per_node = engine_->get_options().thread_.thread_count_per_group_;
199  for (uint16_t node = 0; node < nodes; ++node) {
200  const memory::LocalPageResolver& resolver
202  for (uint16_t local_ordinal = 0; local_ordinal < threads_per_node; ++local_ordinal) {
203  thread::ThreadId thread_id = thread::compose_thread_id(node, local_ordinal);
204  for (SequentialPage* page = get_head(resolver, thread_id); page;) {
205  ASSERT_ND(page->header().page_id_);
206  CHECK_ERROR_CODE(handler(page));
207 
208  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
209  memory::PagePoolOffset offset = next_pointer.get_offset();
210  if (offset != 0) {
211  page = reinterpret_cast<SequentialPage*>(resolver.resolve_offset(offset));
212  } else {
213  page = nullptr;
214  }
215  }
216  }
217  }
218  return kErrorCodeOk;
219  }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
0 means no-error.
Definition: error_code.hpp:87
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
uint16_t group_count_
Number of ThreadGroup in the engine.
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
thread::ThreadOptions thread_
SequentialPage * get_head(const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

SequentialPage * foedus::storage::sequential::SequentialStoragePimpl::get_head ( const memory::LocalPageResolver resolver,
thread::ThreadId  thread_id 
) const

Definition at line 407 of file sequential_storage_pimpl.cpp.

References get_head_pointer(), and foedus::memory::LocalPageResolver::resolve_offset().

Referenced by drop(), and for_every_page().

409  {
410  memory::PagePoolOffset offset = *get_head_pointer(thread_id);
411  if (offset == 0) {
412  return nullptr;
413  }
414  return reinterpret_cast<SequentialPage*>(resolver.resolve_offset(offset));
415 }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::PagePoolOffset * get_head_pointer(thread::ThreadId thread_id) const

Here is the call graph for this function:

Here is the caller graph for this function:

memory::PagePoolOffset * foedus::storage::sequential::SequentialStoragePimpl::get_head_pointer ( thread::ThreadId  thread_id) const

Definition at line 378 of file sequential_storage_pimpl.cpp.

References ASSERT_ND, foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::thread::decompose_numa_local_ordinal(), foedus::thread::decompose_numa_node(), foedus::Attachable< SequentialStorageControlBlock >::engine_, foedus::memory::EngineMemory::get_global_volatile_page_resolver(), foedus::Engine::get_memory_manager(), foedus::Engine::get_options(), foedus::storage::sequential::get_pointer_page_and_index(), foedus::thread::ThreadOptions::group_count_, foedus::storage::sequential::SequentialStoragePimpl::PointerPage::pointers_, foedus::memory::GlobalVolatilePageResolver::resolve_offset_newpage(), foedus::EngineOptions::thread_, and foedus::thread::ThreadOptions::thread_count_per_group_.

Referenced by append_record(), and get_head().

378  {
382  uint16_t page;
383  uint16_t index;
384  get_pointer_page_and_index(thread_id, &page, &index);
385  const memory::GlobalVolatilePageResolver& resolver
387  PointerPage* head_page
388  = reinterpret_cast<PointerPage*>(resolver.resolve_offset_newpage(
389  control_block_->head_pointer_pages_[page]));
390  return head_page->pointers_ + index;
391 }
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:139
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
void get_pointer_page_and_index(uint16_t thread_id, uint16_t *page, uint16_t *index)
Calculate the page/index of the thread-private head/tail pointer.
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
uint16_t group_count_
Number of ThreadGroup in the engine.
thread::ThreadOptions thread_
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

StorageId foedus::storage::sequential::SequentialStoragePimpl::get_id ( ) const
inline

Definition at line 155 of file sequential_storage_pimpl.hpp.

References foedus::Attachable< SequentialStorageControlBlock >::control_block_.

Referenced by append_record(), and truncate().

155 { return control_block_->meta_.id_; }
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111

Here is the caller graph for this function:

const StorageName& foedus::storage::sequential::SequentialStoragePimpl::get_name ( ) const
inline

Definition at line 156 of file sequential_storage_pimpl.hpp.

References foedus::Attachable< SequentialStorageControlBlock >::control_block_.

Referenced by apply_truncate(), create(), drop(), load(), and truncate().

156 { return control_block_->meta_.name_; }
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111

Here is the caller graph for this function:

SequentialPage * foedus::storage::sequential::SequentialStoragePimpl::get_tail ( const memory::LocalPageResolver resolver,
thread::ThreadId  thread_id 
) const

Definition at line 417 of file sequential_storage_pimpl.cpp.

References get_tail_pointer(), and foedus::memory::LocalPageResolver::resolve_offset().

419  {
420  memory::PagePoolOffset offset = *get_tail_pointer(thread_id);
421  if (offset == 0) {
422  return nullptr;
423  }
424  return reinterpret_cast<SequentialPage*>(resolver.resolve_offset(offset));
425 }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::PagePoolOffset * get_tail_pointer(thread::ThreadId thread_id) const

Here is the call graph for this function:

memory::PagePoolOffset * foedus::storage::sequential::SequentialStoragePimpl::get_tail_pointer ( thread::ThreadId  thread_id) const

Definition at line 392 of file sequential_storage_pimpl.cpp.

References ASSERT_ND, foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::thread::decompose_numa_local_ordinal(), foedus::thread::decompose_numa_node(), foedus::Attachable< SequentialStorageControlBlock >::engine_, foedus::memory::EngineMemory::get_global_volatile_page_resolver(), foedus::Engine::get_memory_manager(), foedus::Engine::get_options(), foedus::storage::sequential::get_pointer_page_and_index(), foedus::thread::ThreadOptions::group_count_, foedus::storage::sequential::SequentialStoragePimpl::PointerPage::pointers_, foedus::memory::GlobalVolatilePageResolver::resolve_offset_newpage(), foedus::EngineOptions::thread_, and foedus::thread::ThreadOptions::thread_count_per_group_.

Referenced by append_record(), and get_tail().

392  {
396  uint16_t page;
397  uint16_t index;
398  get_pointer_page_and_index(thread_id, &page, &index);
399  const memory::GlobalVolatilePageResolver& resolver
401  PointerPage* tail_page
402  = reinterpret_cast<PointerPage*>(resolver.resolve_offset_newpage(
403  control_block_->tail_pointer_pages_[page]));
404  return tail_page->pointers_ + index;
405 }
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:139
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
void get_pointer_page_and_index(uint16_t thread_id, uint16_t *page, uint16_t *index)
Calculate the page/index of the thread-private head/tail pointer.
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
uint16_t group_count_
Number of ThreadGroup in the engine.
thread::ThreadOptions thread_
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::storage::sequential::SequentialStoragePimpl::initialize_head_tail_pages ( )

Definition at line 188 of file sequential_storage_pimpl.cpp.

References foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::Attachable< SequentialStorageControlBlock >::engine_, foedus::Engine::get_memory_manager(), foedus::memory::EngineMemory::get_node_memory(), foedus::Engine::get_options(), foedus::memory::PagePool::get_resolver(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), foedus::memory::PagePool::grab_one(), foedus::thread::ThreadOptions::group_count_, foedus::storage::kPageSize, foedus::kRetOk, foedus::memory::LocalPageResolver::resolve_offset_newpage(), foedus::EngineOptions::thread_, and WRAP_ERROR_CODE.

Referenced by create(), and load().

188  {
189  std::memset(control_block_->head_pointer_pages_, 0, sizeof(control_block_->head_pointer_pages_));
190  std::memset(control_block_->tail_pointer_pages_, 0, sizeof(control_block_->tail_pointer_pages_));
191  // we pre-allocate pointer pages for all required nodes.
192  // 2^10 pointers (threads) per page : 4 nodes per page
193 
194  uint32_t nodes = engine_->get_options().thread_.group_count_;
195  for (uint16_t p = 0; p * 4 < nodes; ++p) {
196  uint16_t node = p * 4;
197  memory::NumaNodeMemoryRef* memory = engine_->get_memory_manager()->get_node_memory(node);
198  memory::PagePool* pool = memory->get_volatile_pool();
199  memory::PagePoolOffset head_offset, tail_offset;
200  // minor todo: gracefully fail in case of out of memory
201  WRAP_ERROR_CODE(pool->grab_one(&head_offset));
202  WRAP_ERROR_CODE(pool->grab_one(&tail_offset));
203  control_block_->head_pointer_pages_[p].set(node, head_offset);
204  control_block_->tail_pointer_pages_[p].set(node, tail_offset);
205  void* head_page = pool->get_resolver().resolve_offset_newpage(head_offset);
206  void* tail_page = pool->get_resolver().resolve_offset_newpage(tail_offset);
207  std::memset(head_page, 0, kPageSize);
208  std::memset(tail_page, 0, kPageSize);
209  }
210  return kRetOk;
211 }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
const EngineOptions & get_options() const
Definition: engine.cpp:39
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
uint16_t group_count_
Number of ThreadGroup in the engine.
thread::ThreadOptions thread_
const ErrorStack kRetOk
Normal return value for no-error case.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::storage::sequential::SequentialStoragePimpl::load ( const StorageControlBlock snapshot_block)

Definition at line 166 of file sequential_storage_pimpl.cpp.

References ASSERT_ND, CHECK_ERROR, foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::Attachable< SequentialStorageControlBlock >::engine_, foedus::Engine::get_earliest_epoch(), get_name(), initialize_head_tail_pages(), foedus::Epoch::is_valid(), foedus::storage::kExists, foedus::kRetOk, foedus::storage::StorageControlBlock::meta_, and foedus::Epoch::value().

Referenced by foedus::storage::sequential::SequentialStorage::load().

166  {
167  // for sequential storage, whether the snapshot root pointer is null or not doesn't matter.
168  // essentially load==create, except that it just sets the snapshot root pointer.
169  control_block_->meta_ = static_cast<const SequentialMetadata&>(snapshot_block.meta_);
170  Epoch initial_truncate_epoch(control_block_->meta_.truncate_epoch_);
171  if (!initial_truncate_epoch.is_valid()) {
172  initial_truncate_epoch = engine_->get_earliest_epoch();
173  }
174  ASSERT_ND(initial_truncate_epoch.is_valid());
175  control_block_->meta_.truncate_epoch_ = initial_truncate_epoch.value();
176  control_block_->cur_truncate_epoch_.store(initial_truncate_epoch.value());
177  control_block_->cur_truncate_epoch_tid_.reset();
178  control_block_->cur_truncate_epoch_tid_.xct_id_.set_epoch(initial_truncate_epoch);
179  control_block_->cur_truncate_epoch_tid_.xct_id_.set_ordinal(1);
180 
182  control_block_->root_page_pointer_.snapshot_pointer_
183  = control_block_->meta_.root_snapshot_page_id_;
184  control_block_->status_ = kExists;
185  LOG(INFO) << "Loaded a sequential-storage " << get_name();
186  return kRetOk;
187 }
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
The storage has been created and ready for use.
Definition: storage_id.hpp:158
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Epoch get_earliest_epoch() const
Returns the Earliest-Epoch, the minimum epoch that is valid within this engine.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::storage::sequential::SequentialStoragePimpl::truncate ( Epoch  new_truncate_epoch,
Epoch commit_epoch 
)

Definition at line 236 of file sequential_storage_pimpl.cpp.

References foedus::log::MetaLogBuffer::commit(), foedus::Attachable< SequentialStorageControlBlock >::control_block_, foedus::Attachable< SequentialStorageControlBlock >::engine_, ERROR_STACK, foedus::Engine::get_current_global_epoch(), get_id(), foedus::Engine::get_log_manager(), foedus::log::LogManager::get_meta_buffer(), get_name(), foedus::log::BaseLogType::header_, foedus::Epoch::is_valid(), foedus::kErrorCodeInvalidParameter, foedus::kRetOk, foedus::log::LogHeader::log_length_, foedus::log::LogHeader::log_type_code_, foedus::assorted::memory_fence_acq_rel(), foedus::assorted::memory_fence_release(), foedus::storage::sequential::SequentialTruncateLogType::new_truncate_epoch_, foedus::xct::XctId::set(), foedus::log::LogHeader::storage_id_, and foedus::Epoch::value().

Referenced by foedus::storage::sequential::SequentialStorage::truncate().

236  {
237  LOG(INFO) << "Truncating " << get_name() << " upto Epoch " << new_truncate_epoch
238  << ". old value=" << control_block_->cur_truncate_epoch_;
239  if (!new_truncate_epoch.is_valid()) {
240  LOG(ERROR) << "truncate() was called with an invalid epoch";
242  } else if (new_truncate_epoch < engine_->get_earliest_epoch()) {
243  LOG(ERROR) << "too-old epoch for this system. " << new_truncate_epoch;
245  }
246 
247  // will check this again after locking.
248  if (Epoch(control_block_->cur_truncate_epoch_) >= new_truncate_epoch) {
249  LOG(INFO) << "Already truncated up to " << Epoch(control_block_->cur_truncate_epoch_)
250  << ". Requested = " << new_truncate_epoch;
251  return kRetOk;
252  }
253 
254  if (new_truncate_epoch > engine_->get_current_global_epoch()) {
255  LOG(WARNING) << "Ohh? we don't prohibit it, but are you sure? Truncating up to a future"
256  << " epoch-" << new_truncate_epoch << ". cur_global=" << engine_->get_current_global_epoch();
257  }
258 
259 
260  // We lock it first so that there are no concurrent truncate.
261  {
262  // TODO(Hideaki) Ownerless-lock here
263  // xct::McsOwnerlessLockScope scope(&control_block_->cur_truncate_epoch_tid_);
264 
265  if (Epoch(control_block_->cur_truncate_epoch_) >= new_truncate_epoch) {
266  LOG(INFO) << "Already truncated up to " << Epoch(control_block_->cur_truncate_epoch_)
267  << ". Requested = " << new_truncate_epoch;
268  return kRetOk;
269  }
270 
271  // First, let scanner know that something is happening.
272  // 1. Scanners that didn't observe this and commit before us: fine.
273  // 2. Scanners that didn't observe this and commit after us:
274  // will see this flag and abort in precommit, fine.
275  // 3. Scanners that observe this:
276  // spin until we are done, fine (see optimistic_read_truncate_epoch()).
277  // NOTE: Below, we must NOT have any error-return path. Otherwise being_written state is left.
278  control_block_->cur_truncate_epoch_tid_.xct_id_.set_being_written();
280 
281  // Log this operation as a metadata operation. We get a commit_epoch here.
282  {
283  char log_buffer[sizeof(SequentialTruncateLogType)];
284  std::memset(log_buffer, 0, sizeof(log_buffer));
285  SequentialTruncateLogType* the_log = reinterpret_cast<SequentialTruncateLogType*>(log_buffer);
286  the_log->header_.storage_id_ = get_id();
287  the_log->header_.log_type_code_ = log::get_log_code<SequentialTruncateLogType>();
288  the_log->header_.log_length_ = sizeof(SequentialTruncateLogType);
289  the_log->new_truncate_epoch_ = new_truncate_epoch;
290  engine_->get_log_manager()->get_meta_buffer()->commit(the_log, commit_epoch);
291  }
292 
293  // Then, apply it. This also clears the being_written flag
294  {
295  xct::XctId xct_id;
296  xct_id.set(commit_epoch->value(), 1); // no dependency, so minimal ordinal is always correct
297  control_block_->cur_truncate_epoch_.store(new_truncate_epoch.value()); // atomic!
298  control_block_->cur_truncate_epoch_tid_.xct_id_ = xct_id;
300 
301  // Also set to the metadata to make this permanent.
302  // The metadata will be written out in next snapshot.
303  // Until that, REDO-operation below will re-apply that after crash.
304  control_block_->meta_.truncate_epoch_ = new_truncate_epoch.value();
306  }
307  }
308 
309  LOG(INFO) << "Truncated";
310  return kRetOk;
311 }
MetaLogBuffer * get_meta_buffer()
Definition: log_manager.cpp:61
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
void commit(BaseLogType *metalog, Epoch *commit_epoch)
Synchronously writes out the given log to metadata log file.
const ErrorStack kRetOk
Normal return value for no-error case.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).

Here is the call graph for this function:

Here is the caller graph for this function:


The documentation for this class was generated from the following files: