libfoedus-core
FOEDUS Core Library
foedus::storage::array::ArrayComposeContext Class Reference

ArrayComposer's compose() implementation separated from the class itself. More...

Detailed Description

ArrayComposer's compose() implementation separated from the class itself.

It's a complicated method, so worth being its own class. This defines all the variables maintained during one compose() call.

Definition at line 109 of file array_composer_impl.hpp.

#include <array_composer_impl.hpp>

Public Member Functions

 ArrayComposeContext (Engine *engine, snapshot::MergeSort *merge_sort, snapshot::SnapshotWriter *snapshot_writer, cache::SnapshotFileSet *previous_snapshot_files, Page *root_info_page)
 ArrayComposeContext methods. More...
 
ErrorStack execute ()
 

Constructor & Destructor Documentation

foedus::storage::array::ArrayComposeContext::ArrayComposeContext ( Engine engine,
snapshot::MergeSort merge_sort,
snapshot::SnapshotWriter snapshot_writer,
cache::SnapshotFileSet previous_snapshot_files,
Page root_info_page 
)

ArrayComposeContext methods.

Definition at line 191 of file array_composer_impl.cpp.

References foedus::storage::array::ArrayPartitionerData::array_levels_, foedus::storage::array::ArrayPartitionerData::array_size_, ASSERT_ND, foedus::storage::array::ArrayStorage::get_array_size(), foedus::snapshot::SnapshotWriter::get_intermediate_base(), foedus::snapshot::SnapshotWriter::get_intermediate_size(), foedus::storage::PartitionerMetadata::get_metadata(), foedus::snapshot::SnapshotWriter::get_page_base(), foedus::snapshot::SnapshotWriter::get_page_size(), foedus::storage::array::LookupRouteFinder::get_records_in_leaf(), foedus::storage::array::kInteriorFanout, foedus::storage::PartitionerMetadata::locate_data(), and foedus::storage::PartitionerMetadata::valid_.

197  : engine_(engine),
198  merge_sort_(merge_sort),
199  system_initial_epoch_(engine->get_savepoint_manager()->get_initial_durable_epoch()),
200  storage_id_(merge_sort_->get_storage_id()),
201  snapshot_id_(snapshot_writer->get_snapshot_id()),
202  storage_(engine, storage_id_),
203  snapshot_writer_(snapshot_writer),
204  previous_snapshot_files_(previous_snapshot_files),
205  root_info_page_(reinterpret_cast<ArrayRootInfoPage*>(root_info_page)),
206  payload_size_(storage_.get_payload_size()),
207  levels_(storage_.get_levels()),
208  previous_root_page_pointer_(storage_.get_metadata()->root_snapshot_page_id_) {
209  LookupRouteFinder route_finder(levels_, payload_size_);
210  offset_intervals_[0] = route_finder.get_records_in_leaf();
211  for (uint8_t level = 1; level < levels_; ++level) {
212  offset_intervals_[level] = offset_intervals_[level - 1] * kInteriorFanout;
213  }
214  std::memset(cur_path_, 0, sizeof(cur_path_));
215 
216  allocated_pages_ = 0;
217  allocated_intermediates_ = 0;
218  page_base_ = reinterpret_cast<ArrayPage*>(snapshot_writer_->get_page_base());
219  max_pages_ = snapshot_writer_->get_page_size();
220  intermediate_base_ = reinterpret_cast<ArrayPage*>(snapshot_writer_->get_intermediate_base());
221  max_intermediates_ = snapshot_writer_->get_intermediate_size();
222 
223  PartitionerMetadata* metadata = PartitionerMetadata::get_metadata(engine_, storage_id_);
224  ASSERT_ND(metadata->valid_); // otherwise composer invoked without partitioner. maybe testcase?
225  partitioning_data_ = reinterpret_cast<ArrayPartitionerData*>(metadata->locate_data(engine_));
226  ASSERT_ND(levels_ == partitioning_data_->array_levels_);
227  ASSERT_ND(storage_.get_array_size() == partitioning_data_->array_size_);
228 }
ArrayOffset array_size_
Size of the entire array.
uint16_t get_payload_size() const
Returns byte size of one record in this array storage without internal overheads. ...
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
const Metadata * get_metadata() const
Returns the metadata of this storage.
Definition: storage.hpp:162
ArrayOffset get_array_size() const
Returns the size of this array.
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
storage::Page * get_page_base() __attribute__((always_inline))
uint8_t get_levels() const
Returns the number of levels.
storage::Page * get_intermediate_base() __attribute__((always_inline))
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
Definition: array_id.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
storage::StorageId get_storage_id() const __attribute__((always_inline))
Definition: merge_sort.hpp:367
static PartitionerMetadata * get_metadata(Engine *engine, StorageId id)
Returns the shared memory for the given storage ID.
Definition: partitioner.cpp:38
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112

Here is the call graph for this function:

Member Function Documentation

ErrorStack foedus::storage::array::ArrayComposeContext::execute ( )

Definition at line 230 of file array_composer_impl.cpp.

References ASSERT_ND, foedus::storage::array::ArrayRange::begin_, foedus::storage::array::ArrayPartitionerData::bucket_owners_, CHECK_ERROR, foedus::storage::array::ArrayRange::contains(), foedus::storage::array::ArrayRange::end_, foedus::storage::array::ArrayPage::get_array_range(), foedus::snapshot::MergeSort::get_current_count(), foedus::snapshot::MergeSort::SortEntry::get_key(), foedus::snapshot::SnapshotWriter::get_numa_node(), foedus::snapshot::MergeSort::get_sort_entries(), foedus::storage::array::ArrayRootInfoPage::header_, foedus::snapshot::MergeSort::is_ended_all(), foedus::storage::array::kInteriorFanout, foedus::storage::kPageSize, foedus::kRetOk, LIKELY, foedus::snapshot::MergeSort::next_batch(), foedus::storage::array::ArrayCommonUpdateLogType::offset_, foedus::storage::array::ArrayPartitionerData::partitionable_, foedus::storage::array::ArrayRootInfoPage::pointers_, foedus::snapshot::MergeSort::resolve_sort_position(), foedus::storage::PageHeader::storage_id_, UNLIKELY, and WRAP_ERROR_CODE.

230  {
231  std::memset(root_info_page_, 0, kPageSize);
232  root_info_page_->header_.storage_id_ = storage_id_;
233 
234  if (levels_ <= 1U) {
235  // this storage has only one page. This is very special and trivial.
236  // we process this case separately.
237  return execute_single_level_array();
238  }
239 
240  // This implementation is batch-based to make it significantly more efficient.
241  // We receive a fully-sorted/integrated stream of logs from merge_sort_, and merge them with
242  // previous snapshot on page-basis. Most pages have many logs, so this achieves a tight loop
243  // without expensive cost to switch pages.
244  bool processed_any = false;
245  while (true) {
246  CHECK_ERROR(merge_sort_->next_batch());
247  uint64_t count = merge_sort_->get_current_count();
248  if (count == 0 && merge_sort_->is_ended_all()) {
249  break;
250  }
251  const snapshot::MergeSort::SortEntry* sort_entries = merge_sort_->get_sort_entries();
252  if (!processed_any) {
253  // this is the first log. let's initialize cur_path to this log.
254  processed_any = true;
255  ArrayOffset initial_offset = sort_entries[0].get_key();
256 
257  CHECK_ERROR(initialize(initial_offset));
258  }
259 
260  uint64_t cur = 0;
261  while (cur < count) {
262  const ArrayCommonUpdateLogType* head = reinterpret_cast<const ArrayCommonUpdateLogType*>(
263  merge_sort_->resolve_sort_position(cur));
264  ArrayOffset head_offset = head->offset_;
265  ASSERT_ND(head_offset == sort_entries[cur].get_key());
266  // switch to a page containing this offset
267  WRAP_ERROR_CODE(update_cur_path(head_offset));
268  ArrayRange page_range = cur_path_[0]->get_array_range();
269  ASSERT_ND(page_range.contains(head_offset));
270 
271  // grab a range of logs that are in the same page.
272  uint64_t next;
273  for (next = cur + 1U; LIKELY(next < count); ++next) {
274  // this check uses sort_entries which are nicely contiguous.
275  ArrayOffset offset = sort_entries[next].get_key();
276  ASSERT_ND(offset >= page_range.begin_);
277  if (UNLIKELY(offset >= page_range.end_)) {
278  break;
279  }
280  }
281 
282  apply_batch(cur, next);
283  cur = next;
284  }
285  ASSERT_ND(cur == count);
286  }
287 
288  if (processed_any) {
289  CHECK_ERROR(finalize());
290  } else {
291  LOG(ERROR) << "wtf? no logs? storage-" << storage_id_;
292  }
293 
294 #ifndef NDEBUG
295  uint16_t children = get_root_children();
296  PartitionId partition = snapshot_writer_->get_numa_node();
297  ASSERT_ND(partitioning_data_);
298  for (uint16_t i = 0; i < children; ++i) {
299  if (!partitioning_data_->partitionable_ || partitioning_data_->bucket_owners_[i] == partition) {
300  ASSERT_ND(root_info_page_->pointers_[i] != 0);
301  } else {
302  ASSERT_ND((!is_initial_snapshot() && root_info_page_->pointers_[i] != 0)
303  || (is_initial_snapshot() && root_info_page_->pointers_[i] == 0));
304  }
305  }
306  for (uint16_t i = children; i < kInteriorFanout; ++i) {
307  ASSERT_ND(root_info_page_->pointers_[i] == 0);
308  }
309 #endif // NDEBUG
310 
311  return kRetOk;
312 }
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:378
bool partitionable_
if false, every record goes to node-0.
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
uint64_t ArrayOffset
The only key type in array storage.
Definition: array_id.hpp:48
PartitionId bucket_owners_[kInteriorFanout]
partition of each bucket.
const log::RecordLogType * resolve_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:384
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
SnapshotPagePointer pointers_[kInteriorFanout]
Pointers to direct children of root.
const ArrayRange & get_array_range() const
MergedPosition get_current_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:369
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
Definition: array_id.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45

Here is the call graph for this function:


The documentation for this class was generated from the following files: