libfoedus-core
FOEDUS Core Library
foedus::storage::sequential::SequentialComposer Class Referencefinal

Composer for an sequential storage. More...

Detailed Description

Composer for an sequential storage.

Like partitioner, this does a quite simple stuff. We don't need to do any merge-sort as there is no order. We just sequentially add them all.

Page allcation in compose()
This composer sequentially writes out data pages until the main buffer in snapshot_writer_ becomes full. Whenever it does, it writes out all the pages and treat the first page as one head page. So, this compose() can output more than one head pages. By doing this, we don't have to worry about any of the intermediate pages and pointer installations. Sooooo simple. The limit is of course 500 pointers (4kb), but surely it will fit. If it doesn't, we must consider allowing variable-sized root info page.
Note
This is a private implementation-details of Sequential Storage, thus file name ends with _impl. Do not include this header from a client program. There is no case client program needs to access this internal class.

Definition at line 60 of file sequential_composer_impl.hpp.

#include <sequential_composer_impl.hpp>

Classes

struct  RootInfoPage
 Output of one compose() call, which are then combined in construct_root(). More...
 

Public Member Functions

 SequentialComposer (Composer *parent)
 
std::string to_string () const
 
ErrorStack compose (const Composer::ComposeArguments &args)
 
ErrorStack construct_root (const Composer::ConstructRootArguments &args)
 
Composer::DropResult drop_volatiles (const Composer::DropVolatilesArguments &args)
 

Constructor & Destructor Documentation

foedus::storage::sequential::SequentialComposer::SequentialComposer ( Composer parent)
explicit

Definition at line 45 of file sequential_composer_impl.cpp.

46  : engine_(parent->get_engine()), storage_id_(parent->get_storage_id()) {
47 }

Member Function Documentation

ErrorStack foedus::storage::sequential::SequentialComposer::compose ( const Composer::ComposeArguments args)

Definition at line 154 of file sequential_composer_impl.cpp.

References foedus::storage::sequential::SequentialPage::append_record_nosync(), ASSERT_ND, foedus::storage::sequential::SequentialPage::can_insert_record(), CHECK_ERROR, foedus::storage::sequential::StreamStatus::cur_owner_id_, foedus::storage::sequential::StreamStatus::cur_payload_, foedus::debugging::StopWatch::elapsed_ms(), foedus::storage::sequential::StreamStatus::ended_, foedus::storage::extract_numa_node_from_snapshot_pointer(), foedus::storage::extract_snapshot_id_from_snapshot_pointer(), foedus::storage::sequential::HeadPagePointer::from_epoch_, foedus::storage::sequential::StreamStatus::get_entry(), foedus::xct::XctId::get_epoch(), foedus::snapshot::SnapshotWriter::get_numa_node(), foedus::snapshot::SnapshotWriter::get_page_base(), foedus::snapshot::SnapshotWriter::get_page_size(), foedus::snapshot::SnapshotWriter::get_snapshot_id(), foedus::storage::sequential::SequentialPage::header(), foedus::storage::sequential::SequentialComposer::RootInfoPage::header_, foedus::log::BaseLogType::header_, foedus::storage::sequential::StreamStatus::init(), foedus::storage::sequential::SequentialPage::initialize_snapshot_page(), foedus::kRetOk, foedus::storage::Composer::ComposeArguments::log_streams_, foedus::storage::Composer::ComposeArguments::log_streams_count_, foedus::storage::sequential::StreamStatus::next(), foedus::storage::sequential::SequentialPage::next_page(), foedus::Epoch::one_more(), foedus::storage::sequential::HeadPagePointer::page_count_, foedus::storage::sequential::HeadPagePointer::page_id_, foedus::storage::PageHeader::page_id_, foedus::storage::sequential::SequentialAppendLogType::payload_count_, foedus::storage::sequential::SequentialComposer::RootInfoPage::pointer_, foedus::storage::Composer::ComposeArguments::root_info_page_, foedus::storage::DualPagePointer::snapshot_pointer_, foedus::storage::Composer::ComposeArguments::snapshot_writer_, foedus::fs::status(), foedus::debugging::StopWatch::stop(), foedus::storage::PageHeader::storage_id_, foedus::Epoch::store_max(), foedus::Epoch::store_min(), foedus::storage::sequential::HeadPagePointer::to_epoch_, to_string(), WRAP_ERROR_CODE, and foedus::log::LogHeader::xct_id_.

Referenced by foedus::storage::Composer::compose().

154  {
155  debugging::StopWatch stop_watch;
156 
157  // Everytime it's full, we write out all pages. much simpler than other storage types.
158  // No intermediate pages to track any information.
159  snapshot::SnapshotWriter* snapshot_writer = args.snapshot_writer_;
160  SequentialPage* base = reinterpret_cast<SequentialPage*>(snapshot_writer->get_page_base());
161 
162  SequentialPage* cur_page = compose_new_head(snapshot_writer);
163  SnapshotPagePointer head_page_id = cur_page->header().page_id_;
164  uint32_t allocated_pages = 1;
165  uint64_t total_pages = 0;
166  const uint32_t max_pages = snapshot_writer->get_page_size();
167  VLOG(0) << to_string() << " composing with " << args.log_streams_count_ << " streams.";
168  Epoch min_epoch;
169  Epoch max_epoch;
170  for (uint32_t i = 0; i < args.log_streams_count_; ++i) {
171  StreamStatus status;
172  WRAP_ERROR_CODE(status.init(args.log_streams_[i]));
173  while (!status.ended_) {
174  const SequentialAppendLogType* entry = status.get_entry();
175  Epoch epoch = entry->header_.xct_id_.get_epoch();
176  min_epoch.store_min(epoch);
177  max_epoch.store_max(epoch);
178 
179  // need to allocate a new page?
180  if (!cur_page->can_insert_record(entry->payload_count_)) {
181  // need to flush the buffer?
182  if (allocated_pages >= max_pages) {
183  // dump everything and allocate a new head page
184  CHECK_ERROR(dump_pages(snapshot_writer, false, allocated_pages, &total_pages));
185  cur_page = compose_new_head(snapshot_writer);
186  allocated_pages = 1;
187  } else {
188  // sequential storage is a bit special. As every page is written-once, we need only
189  // snapshot pointer. No dual page pointers.
190  SequentialPage* next_page = base + allocated_pages;
191  ++allocated_pages;
192  next_page->initialize_snapshot_page(storage_id_, cur_page->header().page_id_ + 1ULL);
193  cur_page->next_page().snapshot_pointer_ = next_page->header().page_id_;
194  cur_page = next_page;
195  ASSERT_ND(extract_numa_node_from_snapshot_pointer(cur_page->header().page_id_)
196  == snapshot_writer->get_numa_node());
197  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(cur_page->header().page_id_)
198  == snapshot_writer->get_snapshot_id());
199  }
200  }
201 
202  ASSERT_ND(cur_page->can_insert_record(entry->payload_count_));
203  cur_page->append_record_nosync(
204  status.cur_owner_id_,
205  entry->payload_count_,
206  status.cur_payload_);
207 
208  // then, read next
209  WRAP_ERROR_CODE(status.next());
210  }
211  }
212  // dump everything
213  CHECK_ERROR(dump_pages(snapshot_writer, true, allocated_pages, &total_pages));
214 
215  // this compose() emits just one pointer to the head page.
216  RootInfoPage* root_info_page_casted = reinterpret_cast<RootInfoPage*>(args.root_info_page_);
217  root_info_page_casted->header_.storage_id_ = storage_id_;
218  root_info_page_casted->pointer_.page_id_ = head_page_id;
219  root_info_page_casted->pointer_.from_epoch_ = min_epoch;
220  root_info_page_casted->pointer_.to_epoch_ = max_epoch.one_more(); // to make it exclusive
221  root_info_page_casted->pointer_.page_count_ = total_pages;
222 
223  stop_watch.stop();
224  LOG(INFO) << to_string() << " compose() done in " << stop_watch.elapsed_ms() << "ms. # pages="
225  << total_pages << ", head_page=" << assorted::Hex(head_page_id, 16)
226  << ", min_epoch=" << min_epoch << ", max_epoch=" << max_epoch;
227  return kRetOk;
228 }
FileStatus status(const Path &p)
Returns the status of the file.
Definition: filesystem.cpp:45
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::storage::sequential::SequentialComposer::construct_root ( const Composer::ConstructRootArguments args)

Definition at line 230 of file sequential_composer_impl.cpp.

References ASSERT_ND, foedus::snapshot::SnapshotWriter::dump_pages(), foedus::debugging::StopWatch::elapsed_us(), foedus::storage::extract_numa_node_from_snapshot_pointer(), foedus::storage::extract_snapshot_id_from_snapshot_pointer(), foedus::memory::AlignedMemory::get_block(), foedus::Attachable< CONTROL_BLOCK >::get_control_block(), foedus::storage::Storage< CONTROL_BLOCK >::get_metadata(), foedus::storage::sequential::SequentialRootPage::get_next_page(), foedus::snapshot::SnapshotWriter::get_next_page_id(), foedus::snapshot::SnapshotWriter::get_numa_node(), foedus::snapshot::SnapshotWriter::get_page_base(), foedus::storage::sequential::SequentialRootPage::get_pointer_count(), foedus::storage::sequential::SequentialRootPage::get_pointers(), foedus::snapshot::SnapshotWriter::get_snapshot_id(), foedus::storage::Composer::ConstructRootArguments::gleaner_resource_, foedus::storage::sequential::SequentialRootPage::header(), foedus::storage::sequential::SequentialRootPage::initialize_snapshot_page(), foedus::kRetOk, foedus::storage::sequential::kRootPageMaxHeadPointers, foedus::storage::Composer::ConstructRootArguments::new_root_page_pointer_, foedus::storage::sequential::HeadPagePointer::page_id_, foedus::storage::PageHeader::page_id_, foedus::storage::sequential::SequentialComposer::RootInfoPage::pointer_, foedus::storage::Composer::ConstructRootArguments::previous_snapshot_files_, foedus::cache::SnapshotFileSet::read_page(), foedus::storage::Composer::ConstructRootArguments::root_info_pages_, foedus::storage::Composer::ConstructRootArguments::root_info_pages_count_, foedus::storage::Metadata::root_snapshot_page_id_, foedus::storage::sequential::SequentialRootPage::set_next_page(), foedus::storage::sequential::SequentialRootPage::set_pointers(), foedus::storage::Composer::ConstructRootArguments::snapshot_writer_, foedus::debugging::StopWatch::stop(), foedus::storage::PageHeader::storage_id_, to_string(), foedus::snapshot::LogGleanerResource::work_memory_, and WRAP_ERROR_CODE.

Referenced by foedus::storage::Composer::construct_root().

230  {
231  debugging::StopWatch stop_watch;
232 
233  snapshot::SnapshotWriter* snapshot_writer = args.snapshot_writer_;
234  std::vector<HeadPagePointer> all_head_pages;
235  SequentialStorage storage(engine_, storage_id_);
236  SnapshotPagePointer previous_root_page_pointer = storage.get_metadata()->root_snapshot_page_id_;
237  for (SnapshotPagePointer page_id = previous_root_page_pointer; page_id != 0;) {
238  // if there already is a root page, read them all.
239  // we have to anyway re-write all of them, at least the next pointer.
240  SequentialRootPage* root_page = reinterpret_cast<SequentialRootPage*>(
241  args.gleaner_resource_->work_memory_.get_block());
242  WRAP_ERROR_CODE(args.previous_snapshot_files_->read_page(page_id, root_page));
243  ASSERT_ND(root_page->header().storage_id_ == storage_id_);
244  ASSERT_ND(root_page->header().page_id_ == page_id);
245  for (uint16_t i = 0; i < root_page->get_pointer_count(); ++i) {
246  all_head_pages.push_back(root_page->get_pointers()[i]);
247  }
248  page_id = root_page->get_next_page();
249  }
250 
251  // each root_info_page contains just one pointer to the head page.
252  for (uint32_t i = 0; i < args.root_info_pages_count_; ++i) {
253  const RootInfoPage* info_page = reinterpret_cast<const RootInfoPage*>(args.root_info_pages_[i]);
254  ASSERT_ND(info_page->pointer_.page_id_ > 0);
255  all_head_pages.push_back(info_page->pointer_);
256  }
257  VLOG(0) << to_string() << " construct_root() total head page pointers=" << all_head_pages.size();
258 
259  // now simply write out root pages that contain these pointers.
260  SequentialRootPage* base = reinterpret_cast<SequentialRootPage*>(
261  snapshot_writer->get_page_base());
262  SequentialRootPage* cur_page = base;
263  uint32_t allocated_pages = 1;
264  SnapshotPagePointer root_of_root_page_id = snapshot_writer->get_next_page_id();
265  cur_page->initialize_snapshot_page(storage_id_, root_of_root_page_id);
266  for (uint32_t written_pointers = 0; written_pointers < all_head_pages.size();) {
267  uint16_t count_in_this_page = std::min<uint64_t>(
268  all_head_pages.size() - written_pointers,
270  cur_page->set_pointers(&all_head_pages[written_pointers], count_in_this_page);
271  written_pointers += count_in_this_page;
272  if (written_pointers < all_head_pages.size()) {
273  // we need next page in root page.
274  SequentialRootPage* new_page = cur_page + 1;
275  new_page->initialize_snapshot_page(storage_id_, cur_page->header().page_id_ + 1);
276  ASSERT_ND(extract_numa_node_from_snapshot_pointer(new_page->header().page_id_)
277  == snapshot_writer->get_numa_node());
278  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(new_page->header().page_id_)
279  == snapshot_writer->get_snapshot_id());
280  cur_page->set_next_page(new_page->header().page_id_);
281  cur_page = new_page;
282  ++allocated_pages;
283  } else {
284  ASSERT_ND(written_pointers == all_head_pages.size());
285  }
286  }
287 
288  // write out the new root pages
289  WRAP_ERROR_CODE(snapshot_writer->dump_pages(0, allocated_pages));
290  ASSERT_ND(snapshot_writer->get_next_page_id() == root_of_root_page_id + allocated_pages);
291  *args.new_root_page_pointer_ = root_of_root_page_id;
292 
293  // In sequential, there is only one snapshot pointer to install, the root page.
294  storage.get_control_block()->root_page_pointer_.snapshot_pointer_ = root_of_root_page_id;
295  storage.get_control_block()->meta_.root_snapshot_page_id_ = root_of_root_page_id;
296 
297  stop_watch.stop();
298  VLOG(0) << to_string() << " construct_root() done in " << stop_watch.elapsed_us() << "us."
299  << " total head page pointers=" << all_head_pages.size()
300  << ". new root head page=" << assorted::Hex(*args.new_root_page_pointer_)
301  << ". root_page_count=" << allocated_pages;
302  return kRetOk;
303 }
const uint16_t kRootPageMaxHeadPointers
Maximum number of head pointers in one root page.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.

Here is the call graph for this function:

Here is the caller graph for this function:

Composer::DropResult foedus::storage::sequential::SequentialComposer::drop_volatiles ( const Composer::DropVolatilesArguments args)

Definition at line 309 of file sequential_composer_impl.cpp.

References ASSERT_ND, foedus::storage::combine_volatile_page_pointer(), foedus::thread::compose_thread_id(), foedus::storage::Composer::DropVolatilesArguments::drop(), foedus::Attachable< CONTROL_BLOCK >::get_control_block(), foedus::storage::sequential::SequentialPage::get_first_record_epoch(), foedus::Engine::get_memory_manager(), foedus::memory::EngineMemory::get_node_memory(), foedus::storage::VolatilePagePointer::get_numa_node(), foedus::storage::VolatilePagePointer::get_offset(), foedus::Engine::get_options(), foedus::storage::sequential::SequentialPage::get_record_count(), foedus::memory::PagePool::get_resolver(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), foedus::thread::ThreadOptions::group_count_, foedus::storage::Composer::DropVolatilesArguments::my_partition_, foedus::storage::sequential::SequentialPage::next_page(), foedus::storage::Composer::DropVolatilesArguments::partitioned_drop_, foedus::memory::LocalPageResolver::resolve_offset(), foedus::storage::Composer::DropVolatilesArguments::snapshot_, foedus::EngineOptions::thread_, foedus::thread::ThreadOptions::thread_count_per_group_, foedus::snapshot::Snapshot::valid_until_epoch_, and foedus::storage::DualPagePointer::volatile_pointer_.

Referenced by foedus::storage::Composer::drop_volatiles().

310  {
311  // In sequential, no need to determine what volatile pages to keep.
312  SequentialStorage storage(engine_, storage_id_);
313  SequentialStoragePimpl pimpl(engine_, storage.get_control_block());
314  uint16_t nodes = engine_->get_options().thread_.group_count_;
315  uint16_t threads_per_node = engine_->get_options().thread_.thread_count_per_group_;
316  for (uint16_t node = 0; node < nodes; ++node) {
317  if (args.partitioned_drop_ && args.my_partition_ != node) {
318  continue;
319  }
320  const memory::LocalPageResolver& resolver
322  for (uint16_t local_ordinal = 0; local_ordinal < threads_per_node; ++local_ordinal) {
323  thread::ThreadId thread_id = thread::compose_thread_id(node, local_ordinal);
324  memory::PagePoolOffset* head_ptr = pimpl.get_head_pointer(thread_id);
325  memory::PagePoolOffset* tail_ptr = pimpl.get_tail_pointer(thread_id);
326  memory::PagePoolOffset tail_offset = *tail_ptr;
327  if ((*head_ptr) == 0) {
328  ASSERT_ND(tail_offset == 0);
329  VLOG(0) << "No volatile pages for thread-" << thread_id << " in sequential-" << storage_id_;
330  continue;
331  }
332 
333  ASSERT_ND(tail_offset != 0);
334  while (true) {
335  memory::PagePoolOffset offset = *head_ptr;
336  ASSERT_ND(offset != 0);
337 
338  // if the page is newer than the snapshot, keep them.
339  // all volatile pages/records are appended in epoch order, so no need to check further.
340  SequentialPage* head = reinterpret_cast<SequentialPage*>(resolver.resolve_offset(offset));
341  ASSERT_ND(head->get_record_count() > 0);
342  if (head->get_record_count() > 0
343  && head->get_first_record_epoch() > args.snapshot_.valid_until_epoch_) {
344  VLOG(0) << "Thread-" << thread_id << " in sequential-" << storage_id_ << " keeps volatile"
345  << " pages at and after epoch-" << head->get_first_record_epoch();
346  break;
347  }
348 
349  // okay, drop this
350  memory::PagePoolOffset next = head->next_page().volatile_pointer_.get_offset();
351  ASSERT_ND(next != offset);
352  ASSERT_ND(next == 0 || head->next_page().volatile_pointer_.get_numa_node() == node);
353  args.drop(engine_, combine_volatile_page_pointer(node, offset));
354  if (next == 0) {
355  // it was the tail
356  ASSERT_ND(tail_offset == offset);
357  VLOG(0) << "Thread-" << thread_id << " in sequential-" << storage_id_ << " dropped all"
358  << " volatile pages";
359  *head_ptr = 0;
360  *tail_ptr = 0;
361  break;
362  } else {
363  // move head
364  *head_ptr = next;
365  DVLOG(2) << "Thread-" << thread_id << " in sequential-" << storage_id_ << " dropped a"
366  << " page.";
367  }
368  }
369  }
370  }
371  return Composer::DropResult(args); // always everything dropped
372 }
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
VolatilePagePointer combine_volatile_page_pointer(uint8_t numa_node, memory::PagePoolOffset offset)
Definition: storage_id.hpp:235
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
uint16_t group_count_
Number of ThreadGroup in the engine.
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
thread::ThreadOptions thread_
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

std::string foedus::storage::sequential::SequentialComposer::to_string ( ) const

Definition at line 305 of file sequential_composer_impl.cpp.

Referenced by compose(), and construct_root().

305  {
306  return std::string("SequentialComposer-") + std::to_string(storage_id_);
307 }

Here is the caller graph for this function:


The documentation for this class was generated from the following files: