libfoedus-core
FOEDUS Core Library
foedus::storage::hash::HashComposeContext Class Reference

HashComposer's compose() implementation separated from the class itself. More...

Detailed Description

HashComposer's compose() implementation separated from the class itself.

It's a complicated method, so worth being its own class. This defines all the variables maintained during one compose() call.

Definition at line 142 of file hash_composer_impl.hpp.

#include <hash_composer_impl.hpp>

Public Member Functions

 HashComposeContext (Engine *engine, snapshot::MergeSort *merge_sort, snapshot::SnapshotWriter *snapshot_writer, cache::SnapshotFileSet *previous_snapshot_files, Page *root_info_page)
 HashComposeContext methods. More...
 
ErrorStack execute ()
 

Constructor & Destructor Documentation

foedus::storage::hash::HashComposeContext::HashComposeContext ( Engine engine,
snapshot::MergeSort merge_sort,
snapshot::SnapshotWriter snapshot_writer,
cache::SnapshotFileSet previous_snapshot_files,
Page root_info_page 
)

HashComposeContext methods.

Definition at line 436 of file hash_composer_impl.cpp.

References foedus::memory::AlignedMemory::alloc(), foedus::memory::AlignedMemory::get_block(), foedus::snapshot::SnapshotWriter::get_intermediate_base(), foedus::snapshot::SnapshotWriter::get_intermediate_size(), foedus::snapshot::SnapshotWriter::get_page_base(), foedus::snapshot::SnapshotWriter::get_page_size(), foedus::storage::hash::kHashMaxLevels, foedus::memory::AlignedMemory::kNumaAllocOnnode, and foedus::storage::kPageSize.

442  : engine_(engine),
443  merge_sort_(merge_sort),
444  system_initial_epoch_(engine->get_savepoint_manager()->get_initial_durable_epoch()),
445  storage_id_(merge_sort_->get_storage_id()),
446  snapshot_id_(snapshot_writer->get_snapshot_id()),
447  storage_(engine, storage_id_),
448  snapshot_writer_(snapshot_writer),
449  previous_snapshot_files_(previous_snapshot_files),
450  root_info_page_(reinterpret_cast<HashRootInfoPage*>(root_info_page)),
451  partitionable_(engine_->get_soc_count() > 1U),
452  levels_(storage_.get_levels()),
453  bin_bits_(storage_.get_bin_bits()),
454  bin_shifts_(storage_.get_bin_shifts()),
455  root_children_(storage_.get_root_children()),
456  numa_node_(snapshot_writer->get_numa_node()),
457  total_bin_count_(storage_.get_bin_count()),
458  previous_root_page_pointer_(storage_.get_metadata()->root_snapshot_page_id_),
459  volatile_resolver_(engine->get_memory_manager()->get_global_volatile_page_resolver()) {
460  cur_path_memory_.alloc(
462  kPageSize,
464  numa_node_);
465  cur_path_ = reinterpret_cast<HashIntermediatePage*>(cur_path_memory_.get_block());
466  cur_path_lowest_level_ = levels_;
467  cur_path_valid_range_ = HashBinRange(0, 0);
468 
469  cur_bin_ = kCurBinNotOpened;
470  cur_intermediate_tail_ = nullptr;
471 
472  data_page_io_memory_.alloc(
473  kPageSize,
474  kPageSize,
476  numa_node_);
477 
478  allocated_pages_ = 0;
479  allocated_intermediates_ = 0;
480  page_base_ = reinterpret_cast<HashDataPage*>(snapshot_writer_->get_page_base());
481  max_pages_ = snapshot_writer_->get_page_size();
482  intermediate_base_
483  = reinterpret_cast<HashComposedBinsPage*>(snapshot_writer_->get_intermediate_base());
484  max_intermediates_ = snapshot_writer_->get_intermediate_size();
485 }
numa_alloc_onnode() and numa_free().
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
const Metadata * get_metadata() const
Returns the metadata of this storage.
Definition: storage.hpp:162
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
const uint8_t kHashMaxLevels
Max level of intermediate pages.
Definition: hash_id.hpp:104
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
storage::Page * get_page_base() __attribute__((always_inline))
void * get_block() const
Returns the memory block.
storage::Page * get_intermediate_base() __attribute__((always_inline))
storage::StorageId get_storage_id() const __attribute__((always_inline))
Definition: merge_sort.hpp:367
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112

Here is the call graph for this function:

Member Function Documentation

ErrorStack foedus::storage::hash::HashComposeContext::execute ( )

Definition at line 487 of file hash_composer_impl.cpp.

References ASSERT_ND, CHECK_ERROR, foedus::storage::hash::HashTmpBin::clean(), foedus::storage::hash::HashTmpBin::create_memory(), foedus::storage::hash::HashStorage::get_bin_count(), foedus::snapshot::MergeSort::get_current_count(), foedus::snapshot::MergeSort::SortEntry::get_key(), foedus::snapshot::MergeSort::get_sort_entries(), foedus::storage::hash::HashIntermediatePage::header(), foedus::snapshot::MergeSort::is_ended_all(), foedus::storage::kPageSize, foedus::kRetOk, LIKELY, foedus::snapshot::MergeSort::next_batch(), foedus::storage::PageHeader::storage_id_, UNLIKELY, and WRAP_ERROR_CODE.

487  {
488  // Initializations
489  std::memset(root_info_page_, 0, kPageSize);
490  root_info_page_->header().storage_id_ = storage_id_;
491  CHECK_ERROR(init_intermediates());
492  CHECK_ERROR(init_cur_path());
493  WRAP_ERROR_CODE(cur_bin_table_.create_memory(numa_node_)); // TASK(Hideaki) reuse memory
494  cur_bin_table_.clean();
495  VLOG(0) << "HashComposer-" << storage_id_ << " initialization done. processing...";
496 
497  bool processed_any = false;
498  cur_bin_ = kCurBinNotOpened;
499  while (true) {
500  CHECK_ERROR(merge_sort_->next_batch());
501  uint64_t count = merge_sort_->get_current_count();
502  if (count == 0 && merge_sort_->is_ended_all()) {
503  break;
504  }
505  processed_any = true;
506  const snapshot::MergeSort::SortEntry* sort_entries = merge_sort_->get_sort_entries();
507  uint64_t cur = 0;
508  while (cur < count) {
509  HashBin head_bin = sort_entries[cur].get_key();
510  ASSERT_ND(head_bin < storage_.get_bin_count());
511  if (cur_bin_ != head_bin) {
512  // now we have to finalize the previous bin and switch to a new bin!
513  ASSERT_ND(cur_bin_ == kCurBinNotOpened || cur_bin_ < head_bin); // sorted by bins
514  CHECK_ERROR(close_cur_bin());
515  ASSERT_ND(cur_bin_ == kCurBinNotOpened);
516  CHECK_ERROR(open_cur_bin(head_bin));
517  ASSERT_ND(cur_bin_ == head_bin);
518  }
519 
520  // grab a range of logs that are in the same bin.
521  uint64_t next;
522  for (next = cur + 1U; LIKELY(next < count); ++next) {
523  // this check uses sort_entries which are nicely contiguous.
524  HashBin bin = sort_entries[next].get_key();
525  ASSERT_ND(bin >= head_bin);
526  if (UNLIKELY(bin != head_bin)) {
527  break;
528  }
529  }
530 
531  WRAP_ERROR_CODE(apply_batch(cur, next));
532  cur = next;
533  }
534  ASSERT_ND(cur == count);
535  }
536 
537  if (!processed_any) {
538  LOG(ERROR) << "wtf? no logs? storage-" << storage_id_;
539  }
540 
541  CHECK_ERROR(finalize());
542  return kRetOk;
543 }
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:378
ErrorCode create_memory(uint16_t numa_node, uint64_t initial_size=kDefaultInitialSize)
Allocates the memory to use by this object.
Definition: hash_tmpbin.cpp:36
void clean()
Removes all tuple data for the current bin.
Definition: hash_tmpbin.cpp:74
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
MergedPosition get_current_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:369
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45

Here is the call graph for this function:


The documentation for this class was generated from the following files: