libfoedus-core
FOEDUS Core Library
foedus::snapshot::MergeSort Class Referencefinal

Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs. More...

Detailed Description

Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs.

Where this is used, and why
During snapshot, log reducer merges the current in-memory buffer with zero or more dumped sorted runs on disk. We initially used a tuple-based algorithm to select a stream that has the smallest key for each iteration, but it was not desirable for overheads and code complexity. Batching the sorting is much faster than even tournament-tree, and also separates sorting logic from the composer. Hence this class.
Batching, Batching, Batching
All frequently used code path must be batched, at least in 1000s. An exception is the code that is invoked only occasionally, eg once per block. There are a few logics that must be customized for each storage type (eg handling of keys). Such customization is done in granularity of batch, too. No switch for individual logs.
Algorithm at a glance
  • a) Pick a chunk (1000s of logs) from an input whose last key (called chunk-last-key) is the smallest among inputs.
  • b) Repeat a) until some input must move on to next window or we have a large number of logs.
  • c) The smallest chunk key among all inputs is the batch-threshold. All tuples in all input's chunks whose key is strictly smaller than the threshold are sorted.
  • d) Composer consumes the sorted inputs.
  • e) Advance each input's current place, move their windows if needed (step a-d never moves windows), and repeat a) until all inputs are done.
  • Some special condition: chunk-last-key survives to next iteration except when the chunk is the last chunk of last window. In that case, the batch-threshold is chosen ignoring the input. The logs are merged if key < threshold, marking the input as ended if all logs are merged. If all inputs are in the last chunk (last iteration), we include all remainings from all inputs.
  • Another special condition: Obviously, this class does nothing when input_count is 1, skipping all the overheads. This should hopefully happen often if reducers have large buffers.
Modularity
This class has no dependency to other modules. It receives buffers of logs, that's it. We must keep this class in that way for easier testing and tuning.

Definition at line 77 of file merge_sort.hpp.

#include <merge_sort.hpp>

Inheritance diagram for foedus::snapshot::MergeSort:
Collaboration diagram for foedus::snapshot::MergeSort:

Classes

struct  AdjustComparatorMasstree
 Used in batch_sort_adjust_sort if the storage is a masstree storage. More...
 
struct  GroupifyResult
 Represents a group of consecutive logs in the current batch. More...
 
struct  InputStatus
 Current status of each input. More...
 
struct  PositionEntry
 Provides additional information for each entry we are sorting. More...
 
struct  SortEntry
 Entries we actually sort. More...
 

Public Types

enum  Constants {
  kMaxMergedPosition = 1 << 23, kLogChunk = 1 << 10, kDefaultChunkBatch = 1 << 8, kMaxLevels = 32,
  kWindowChunkReserveBytes = 1 << 16
}
 
typedef uint32_t MergedPosition
 Position in MergeSort's buffer. More...
 
typedef uint16_t InputIndex
 Index in input streams. More...
 

Public Member Functions

 MergeSort (storage::StorageId id, storage::StorageType type, Epoch base_epoch, SortedBuffer *const *inputs, uint16_t inputs_count, uint16_t max_original_pages, memory::AlignedMemory *const work_memory, uint16_t chunk_batch_size=kDefaultChunkBatch)
 
ErrorStack next_batch ()
 Executes merge-sort on several thousands of logs and provides the result as a batch. More...
 
uint32_t fetch_logs (uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
 To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entries in parallel. More...
 
GroupifyResult groupify (uint32_t begin, uint32_t limit=0) const __attribute__((always_inline))
 Find a group of consecutive logs from the given position that have either a common log type or a common key. More...
 
void assert_sorted ()
 For debug/test only. More...
 
ErrorStack initialize_once () override
 
ErrorStack uninitialize_once () override
 
bool is_no_merging () const __attribute__((always_inline))
 
bool is_ended_all () const
 
storage::StorageId get_storage_id () const __attribute__((always_inline))
 
Epoch get_base_epoch () const __attribute__((always_inline))
 
MergedPosition get_current_count () const __attribute__((always_inline))
 
InputIndex get_inputs_count () const __attribute__((always_inline))
 
const PositionEntryget_position_entries () const __attribute__((always_inline))
 
log::LogCode get_log_type_from_sort_position (uint32_t sort_pos) const __attribute__((always_inline))
 
const SortEntryget_sort_entries () const __attribute__((always_inline))
 
const log::RecordLogTyperesolve_merged_position (MergedPosition pos) const __attribute__((always_inline))
 
const log::RecordLogTyperesolve_sort_position (uint32_t sort_pos) const __attribute__((always_inline))
 
void change_log_type_at (uint32_t sort_pos, log::LogCode before, log::LogCode after)
 used to switch between two compatible log types. More...
 
storage::Pageget_original_pages () const __attribute__((always_inline))
 
bool are_all_single_layer_logs () const __attribute__((always_inline))
 
uint16_t get_longest_key_length () const __attribute__((always_inline))
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Member Typedef Documentation

Index in input streams.

Definition at line 85 of file merge_sort.hpp.

Position in MergeSort's buffer.

We never sort more than 2^23 entries at once because we merge by chunks.

Definition at line 83 of file merge_sort.hpp.

Member Enumeration Documentation

Enumerator
kMaxMergedPosition 

This is a theoretical max.

additionally it must be less than buffer_capacity_

kLogChunk 

1024 logs per chunk

kDefaultChunkBatch 

Suppose each log is 50 bytes: 1k*256*50b=12.5 MB worth logs to sort per batch.

kMaxLevels 

We assume the path wouldn't be this deep.

kWindowChunkReserveBytes 

To avoid handling the case where a log spans an end of window, chunks leave at least this many bytes in each window.

No single log can be more than this size, so it simplifies the logic. If the input is in the last window, this value has no effects.

Definition at line 87 of file merge_sort.hpp.

87  {
89  kMaxMergedPosition = 1 << 23,
91  kLogChunk = 1 << 10,
93  kDefaultChunkBatch = 1 << 8,
95  kMaxLevels = 32,
101  kWindowChunkReserveBytes = 1 << 16,
102  };
Suppose each log is 50 bytes: 1k*256*50b=12.5 MB worth logs to sort per batch.
Definition: merge_sort.hpp:93
We assume the path wouldn't be this deep.
Definition: merge_sort.hpp:95
To avoid handling the case where a log spans an end of window, chunks leave at least this many bytes ...
Definition: merge_sort.hpp:101

Constructor & Destructor Documentation

foedus::snapshot::MergeSort::MergeSort ( storage::StorageId  id,
storage::StorageType  type,
Epoch  base_epoch,
SortedBuffer *const *  inputs,
uint16_t  inputs_count,
uint16_t  max_original_pages,
memory::AlignedMemory *const  work_memory,
uint16_t  chunk_batch_size = kDefaultChunkBatch 
)

Definition at line 63 of file merge_sort.cpp.

References ASSERT_ND.

73  id_(id),
74  type_(type),
75  base_epoch_(base_epoch),
76  shortest_key_length_(extract_shortest_key_length(inputs, inputs_count)),
77  longest_key_length_(extract_longest_key_length(inputs, inputs_count)),
78  inputs_(inputs),
79  inputs_count_(inputs_count),
80  max_original_pages_(max_original_pages),
81  chunk_batch_size_(chunk_batch_size),
82  work_memory_(work_memory) {
83  ASSERT_ND(shortest_key_length_ <= longest_key_length_);
84  ASSERT_ND(shortest_key_length_ > 0);
85  ASSERT_ND(chunk_batch_size_ > 0);
86  current_count_ = 0;
87  sort_entries_ = nullptr;
88  position_entries_ = nullptr;
89  original_pages_ = nullptr;
90  inputs_status_ = nullptr;
91 }
uint16_t extract_shortest_key_length(SortedBuffer *const *inputs, uint16_t inputs_count)
Definition: merge_sort.cpp:48
uint16_t extract_longest_key_length(SortedBuffer *const *inputs, uint16_t inputs_count)
Definition: merge_sort.cpp:55
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Member Function Documentation

bool foedus::snapshot::MergeSort::are_all_single_layer_logs ( ) const
inline

Definition at line 411 of file merge_sort.hpp.

411 { return longest_key_length_ <= 8U; }
void foedus::snapshot::MergeSort::assert_sorted ( )

For debug/test only.

Checks if sort_entries_ are indeed fully sorted. This method is wiped out in release mode.

Definition at line 695 of file merge_sort.cpp.

References ASSERT_ND, foedus::storage::hash::HashCommonLogType::assert_type(), foedus::snapshot::MergeSort::SortEntry::data_, foedus::xct::XctId::get_epoch(), foedus::xct::XctId::get_ordinal(), foedus::snapshot::MergeSort::SortEntry::get_position(), foedus::log::BaseLogType::header_, foedus::snapshot::MergeSort::PositionEntry::input_index_, foedus::storage::kArrayStorage, foedus::storage::kHashStorage, foedus::storage::kMasstreeStorage, foedus::snapshot::MergeSort::SortEntry::set(), foedus::Epoch::subtract(), and foedus::log::LogHeader::xct_id_.

695  {
696 #ifndef NDEBUG
697  for (MergedPosition i = 0; i < current_count_; ++i) {
698  MergedPosition cur_pos = sort_entries_[i].get_position();
699  const log::RecordLogType* cur = inputs_status_[position_entries_[cur_pos].input_index_].
700  from_compact_pos(position_entries_[cur_pos].input_position_);
701 
702  // does it point to a correct log?
703  Epoch epoch = cur->header_.xct_id_.get_epoch();
704  uint16_t compressed_epoch = epoch.subtract(base_epoch_);
705  SortEntry dummy;
706  if (type_ == storage::kArrayStorage) {
707  const auto* casted = reinterpret_cast<const storage::array::ArrayCommonUpdateLogType*>(cur);
708  dummy.set(
709  casted->offset_,
710  compressed_epoch,
711  cur->header_.xct_id_.get_ordinal(),
712  false,
713  cur_pos);
714  } else if (type_ == storage::kMasstreeStorage) {
715  const auto* casted = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(cur);
716  dummy.set(
717  casted->get_first_slice(),
718  compressed_epoch,
719  cur->header_.xct_id_.get_ordinal(),
720  casted->key_length_ != sizeof(storage::masstree::KeySlice),
721  cur_pos);
722  } else {
724  const auto* casted = reinterpret_cast<const storage::hash::HashCommonLogType*>(cur);
725  casted->assert_type();
726  storage::hash::HashBin bin = casted->hash_ >> (64U - casted->bin_bits_);
727  dummy.set(
728  bin,
729  compressed_epoch,
730  cur->header_.xct_id_.get_ordinal(),
731  false, // hash doesn't need further sorting so far.
732  cur_pos);
733  }
734  ASSERT_ND(dummy.data_ == sort_entries_[i].data_);
735  if (i == 0) {
736  continue;
737  }
738 
739  // compare with previous
740  MergedPosition prev_pos = sort_entries_[i - 1].get_position();
741  ASSERT_ND(prev_pos != cur_pos);
742  const log::RecordLogType* prev = inputs_status_[position_entries_[prev_pos].input_index_].
743  from_compact_pos(position_entries_[prev_pos].input_position_);
744  int cmp = compare_logs(prev, cur);
745  ASSERT_ND(cmp <= 0);
746  if (cmp == 0) {
747  // the last of sort order is position.
748  ASSERT_ND(prev_pos < cur_pos);
749  }
750  }
751 #endif // NDEBUG
752 }
uint64_t KeySlice
Each key slice is an 8-byte integer.
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138

Here is the call graph for this function:

void foedus::snapshot::MergeSort::change_log_type_at ( uint32_t  sort_pos,
log::LogCode  before,
log::LogCode  after 
)
inline

used to switch between two compatible log types.

It's a special-purpose method, use it with caution.

Definition at line 393 of file merge_sort.hpp.

References ASSERT_ND, foedus::snapshot::MergeSort::PositionEntry::get_log_type(), get_log_type_from_sort_position(), foedus::snapshot::MergeSort::SortEntry::get_position(), foedus::log::LogHeader::get_type(), foedus::log::BaseLogType::header_, foedus::snapshot::MergeSort::PositionEntry::log_type_, foedus::log::LogHeader::log_type_code_, and resolve_merged_position().

393  {
394  // we have to change two places; PositionEntry and header of the log itself.
395  ASSERT_ND(sort_pos < current_count_);
396  ASSERT_ND(get_log_type_from_sort_position(sort_pos) == before);
397  MergedPosition pos = sort_entries_[sort_pos].get_position();
398  PositionEntry& entry = position_entries_[pos];
399  ASSERT_ND(entry.get_log_type() == before);
400  entry.log_type_ = after;
401  ASSERT_ND(entry.get_log_type() == after);
402  ASSERT_ND(get_log_type_from_sort_position(sort_pos) == after);
403 
404  log::RecordLogType* record = const_cast<log::RecordLogType*>(resolve_merged_position(pos));
405  ASSERT_ND(record->header_.get_type() == before);
406  record->header_.log_type_code_ = after;
407  ASSERT_ND(record->header_.get_type() == after);
408  }
log::LogCode get_log_type_from_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:374
const log::RecordLogType * resolve_merged_position(MergedPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:379
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138

Here is the call graph for this function:

uint32_t foedus::snapshot::MergeSort::fetch_logs ( uint32_t  sort_pos,
uint32_t  count,
log::RecordLogType const **  out 
) const

To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entries in parallel.

Parameters
[in]sort_poswe prefetch from this sort_entries_ -> position_entries_ -> logs
[in]countwe prefetch upto sort_entries_[sort_pos + count - 1]. Should be at least 2 or 4 to make parallel prefetch effective.
[out]outFetched logs.
Precondition
sort_pos <= current_count_
Returns
fetched count. unless sort_pos+count>current_count, same as count.

Definition at line 273 of file merge_sort.cpp.

References ASSERT_ND, foedus::snapshot::MergeSort::InputStatus::from_compact_pos(), foedus::snapshot::MergeSort::SortEntry::get_position(), foedus::snapshot::MergeSort::PositionEntry::input_index_, is_no_merging(), and foedus::assorted::prefetch_cacheline().

276  {
277  ASSERT_ND(sort_pos <= current_count_);
278  uint32_t fetched_count = count;
279  if (sort_pos + count > current_count_) {
280  fetched_count = current_count_ - sort_pos;
281  }
282 
283  if (is_no_merging()) {
284  // no merge sort.
285 #ifndef NDEBUG
286  for (uint32_t i = 0; i < fetched_count; ++i) {
287  ASSERT_ND(sort_entries_[sort_pos + i].get_position() == sort_pos + i);
288  }
289 #endif // NDEBUG
290  // in this case, the pointed logs are also contiguous. no point to do prefetching.
291  for (uint32_t i = 0; i < fetched_count; ++i) {
292  MergedPosition pos = sort_pos + i;
293  ASSERT_ND(position_entries_[pos].input_index_ == 0);
294  out[i] = inputs_status_[0].from_compact_pos(position_entries_[pos].input_position_);
295  }
296  return fetched_count;
297  }
298 
299  // prefetch position entries
300  for (uint32_t i = 0; i < fetched_count; ++i) {
301  MergedPosition pos = sort_entries_[sort_pos + i].get_position();
302  assorted::prefetch_cacheline(position_entries_ + pos);
303  }
304  // prefetch and fetch logs
305  for (uint32_t i = 0; i < fetched_count; ++i) {
306  MergedPosition pos = sort_entries_[sort_pos + i].get_position();
307  InputIndex input = position_entries_[pos].input_index_;
308  out[i] = inputs_status_[input].from_compact_pos(position_entries_[pos].input_position_);
310  }
311  return fetched_count;
312 }
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
uint16_t InputIndex
Index in input streams.
Definition: merge_sort.hpp:85
const log::RecordLogType * from_compact_pos(BufferPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:214
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool is_no_merging() const __attribute__((always_inline))
Definition: merge_sort.hpp:356
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138

Here is the call graph for this function:

Epoch foedus::snapshot::MergeSort::get_base_epoch ( ) const
inline

Definition at line 368 of file merge_sort.hpp.

368 { return base_epoch_; }
MergedPosition foedus::snapshot::MergeSort::get_current_count ( ) const
inline

Definition at line 369 of file merge_sort.hpp.

Referenced by foedus::storage::array::ArrayComposeContext::execute(), foedus::storage::hash::HashComposeContext::execute(), and foedus::storage::masstree::MasstreeComposeContext::execute().

369 { return current_count_; }

Here is the caller graph for this function:

InputIndex foedus::snapshot::MergeSort::get_inputs_count ( ) const
inline

Definition at line 370 of file merge_sort.hpp.

370 { return inputs_count_; }
log::LogCode foedus::snapshot::MergeSort::get_log_type_from_sort_position ( uint32_t  sort_pos) const
inline

Definition at line 374 of file merge_sort.hpp.

References foedus::snapshot::MergeSort::PositionEntry::get_log_type(), and foedus::snapshot::MergeSort::SortEntry::get_position().

Referenced by change_log_type_at().

374  {
375  MergedPosition pos = sort_entries_[sort_pos].get_position();
376  return position_entries_[pos].get_log_type();
377  }
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
log::LogCode get_log_type() const __attribute__((always_inline))
Definition: merge_sort.hpp:154
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138

Here is the call graph for this function:

Here is the caller graph for this function:

uint16_t foedus::snapshot::MergeSort::get_longest_key_length ( ) const
inline

Definition at line 412 of file merge_sort.hpp.

412 { return longest_key_length_; }
storage::Page* foedus::snapshot::MergeSort::get_original_pages ( ) const
inline

Definition at line 410 of file merge_sort.hpp.

410 { return original_pages_; }
const PositionEntry* foedus::snapshot::MergeSort::get_position_entries ( ) const
inline

Definition at line 371 of file merge_sort.hpp.

371  {
372  return position_entries_;
373  }
const SortEntry* foedus::snapshot::MergeSort::get_sort_entries ( ) const
inline

Definition at line 378 of file merge_sort.hpp.

Referenced by foedus::storage::array::ArrayComposeContext::execute(), and foedus::storage::hash::HashComposeContext::execute().

378 { return sort_entries_; }

Here is the caller graph for this function:

storage::StorageId foedus::snapshot::MergeSort::get_storage_id ( ) const
inline

Definition at line 367 of file merge_sort.hpp.

367 { return id_; }
MergeSort::GroupifyResult foedus::snapshot::MergeSort::groupify ( uint32_t  begin,
uint32_t  limit = 0 
) const
inline

Find a group of consecutive logs from the given position that have either a common log type or a common key.

Parameters
[in]beginstarting position in the sorted array.
[in]limitmaximum number of logs to check. If not specified, all logs in the batch.
Returns
the group found in the current batch.

Some composer uses this optional method to batch-process the logs. In an ideal case, there always is a big group, and composer calls this method per hundreds of logs. But, it's also quite possible that there isn't a good group. Hence, these methods are called very frequently (potentially for each log). Worth explicit inlining.

Definition at line 534 of file merge_sort.hpp.

References ASSERT_ND, foedus::snapshot::MergeSort::GroupifyResult::count_, foedus::snapshot::MergeSort::SortEntry::get_position(), foedus::snapshot::MergeSort::GroupifyResult::has_common_key_, foedus::snapshot::MergeSort::GroupifyResult::has_common_log_code_, foedus::log::kLogCodeInvalid, foedus::snapshot::MergeSort::GroupifyResult::log_code_, foedus::snapshot::MergeSort::PositionEntry::log_type_, and UNLIKELY.

Referenced by foedus::storage::masstree::MasstreeComposeContext::execute().

534  {
535  ASSERT_ND(begin < current_count_);
536  GroupifyResult result;
537  result.count_ = 1;
538  result.has_common_key_ = false;
539  result.has_common_log_code_ = false;
540  result.log_code_ = log::kLogCodeInvalid;
541  if (UNLIKELY(begin + 1U == current_count_)) {
542  return result;
543  }
544 
545  // first, check common keys
546  uint32_t cur;
547  if (shortest_key_length_ == 8U && longest_key_length_ == 8U) {
548  cur = groupify_find_common_keys_8b(begin, limit);
549  } else {
550  cur = groupify_find_common_keys_general(begin, limit);
551  }
552 
553  ASSERT_ND(cur >= begin);
554  ASSERT_ND(cur < current_count_);
555  if (UNLIKELY(cur != begin)) {
556  result.has_common_key_ = true;
557  result.count_ = cur - begin + 1U;
558  return result;
559  }
560 
561  // if keys are different, check common log types.
562  cur = groupify_find_common_log_type(begin, limit);
563  ASSERT_ND(cur >= begin);
564  ASSERT_ND(cur < current_count_);
565  if (UNLIKELY(cur != begin)) {
566  result.has_common_log_code_ = true;
567  result.log_code_ = position_entries_[sort_entries_[cur].get_position()].log_type_;
568  result.count_ = cur - begin + 1U;
569  return result;
570  } else {
571  return result;
572  }
573 }
uint16_t log_type_
not the enum itself for explicit size.
Definition: merge_sort.hpp:151
0 is reserved as a non-existing log type.
Definition: log_type.hpp:89
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::MergeSort::initialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 93 of file merge_sort.cpp.

References foedus::snapshot::SortedBuffer::assert_checks(), foedus::snapshot::MergeSort::InputStatus::assert_consistent(), ASSERT_ND, foedus::memory::AlignedMemory::assure_capacity(), foedus::snapshot::MergeSort::InputStatus::chunk_relative_pos_, foedus::snapshot::MergeSort::InputStatus::cur_relative_pos_, foedus::snapshot::MergeSort::InputStatus::end_absolute_pos_, foedus::memory::AlignedMemory::get_block(), foedus::snapshot::SortedBuffer::get_buffer(), foedus::snapshot::SortedBuffer::get_buffer_size(), foedus::snapshot::SortedBuffer::get_cur_block_abosulte_begin(), foedus::snapshot::SortedBuffer::get_cur_block_abosulte_end(), foedus::snapshot::SortedBuffer::get_offset(), foedus::memory::AlignedMemory::get_size(), kLogChunk, foedus::storage::kPageSize, foedus::kRetOk, foedus::snapshot::MergeSort::InputStatus::previous_chunk_relative_pos_, foedus::fs::status(), foedus::snapshot::MergeSort::InputStatus::window_, foedus::snapshot::MergeSort::InputStatus::window_offset_, foedus::snapshot::MergeSort::InputStatus::window_size_, and WRAP_ERROR_CODE.

93  {
94  // in each batch, we might include tuples from an input even if we didn't fully pick a chunk from
95  // it (at most kLogChunk-1 such tuples). so, conservatively chunk_batch_size_ + inputs_count_.
96  uint32_t buffer_capacity = kLogChunk * (chunk_batch_size_ + inputs_count_);
97  buffer_capacity_ = assorted::align<uint32_t, 512U>(buffer_capacity);
98  uint64_t byte_size = buffer_capacity_ * (sizeof(SortEntry) + sizeof(PositionEntry));
99  ASSERT_ND(byte_size % 4096U == 0);
100  byte_size += storage::kPageSize * (max_original_pages_ + 1U);
101  byte_size += sizeof(InputStatus) * inputs_count_;
102  WRAP_ERROR_CODE(work_memory_->assure_capacity(byte_size));
103 
104  // assign pointers
105  char* block = reinterpret_cast<char*>(work_memory_->get_block());
106 #ifndef NDEBUG
107  std::memset(block, 0xDA, work_memory_->get_size());
108 #endif // NDEBUG
109  uint64_t offset = 0;
110  sort_entries_ = reinterpret_cast<SortEntry*>(block + offset);
111  offset += sizeof(SortEntry) * buffer_capacity;
112  position_entries_ = reinterpret_cast<PositionEntry*>(block + offset);
113  offset += sizeof(PositionEntry) * buffer_capacity;
114  original_pages_ = reinterpret_cast<storage::Page*>(block + offset);
115  offset += sizeof(storage::Page) * (max_original_pages_ + 1U);
116  inputs_status_ = reinterpret_cast<InputStatus*>(block + offset);
117  offset += sizeof(InputStatus) * inputs_count_;
118  ASSERT_ND(offset == byte_size);
119 
120  // initialize inputs_status_
121  for (InputIndex i = 0; i < inputs_count_; ++i) {
122  InputStatus* status = inputs_status_ + i;
123  SortedBuffer* input = inputs_[i];
124  input->assert_checks();
125  status->window_ = input->get_buffer();
126  status->window_offset_ = input->get_offset();
127  status->window_size_ = input->get_buffer_size();
128  uint64_t cur_abs = input->get_cur_block_abosulte_begin();
129  // this is the initial read of this block, so we are sure cur_block_abosulte_begin is the window
130  ASSERT_ND(cur_abs >= status->window_offset_);
131  status->cur_relative_pos_ = cur_abs - status->window_offset_;
132  status->chunk_relative_pos_ = status->cur_relative_pos_; // hence the chunk has only 1 log
133  status->previous_chunk_relative_pos_ = status->chunk_relative_pos_;
134 
135  uint64_t end_abs = input->get_cur_block_abosulte_end();
136  status->end_absolute_pos_ = end_abs;
137 
138  status->assert_consistent();
139  }
140  return kRetOk;
141 }
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
FileStatus status(const Path &p)
Returns the status of the file.
Definition: filesystem.cpp:45
uint16_t InputIndex
Index in input streams.
Definition: merge_sort.hpp:85
void * get_block() const
Returns the memory block.
uint64_t get_size() const
Returns the byte size of the memory block.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45

Here is the call graph for this function:

bool foedus::snapshot::MergeSort::is_ended_all ( ) const
inline

Definition at line 358 of file merge_sort.hpp.

Referenced by foedus::storage::array::ArrayComposeContext::execute(), foedus::storage::hash::HashComposeContext::execute(), foedus::storage::masstree::MasstreeComposeContext::execute(), and next_batch().

358  {
359  for (InputIndex i = 0; i < inputs_count_; ++i) {
360  if (!inputs_status_[i].is_ended()) {
361  return false;
362  }
363  }
364  return true;
365  }
uint16_t InputIndex
Index in input streams.
Definition: merge_sort.hpp:85

Here is the caller graph for this function:

bool foedus::snapshot::MergeSort::is_no_merging ( ) const
inline

Definition at line 356 of file merge_sort.hpp.

Referenced by fetch_logs(), and next_batch().

356 { return inputs_count_ == 1U; }

Here is the caller graph for this function:

ErrorStack foedus::snapshot::MergeSort::next_batch ( )

Executes merge-sort on several thousands of logs and provides the result as a batch.

Precondition
is_initialized (call initialize() first!)

Composer repeatedly invokes this method until it returns false. For each invokation (batch), composer consumes the entries in sorted order, in other words sort_entries_[0] to sort_entries_[current_count_ - 1], which points to position_entries_. This level of indirection might cause more L1 cache misses (only if we could fit everything into 16 bytes!), so we do prefetching to ameriolate it.

See also
fetch_logs(), which does the prefetching
get_current_count(), which will return the number of logs in the generated batch.

Definition at line 143 of file merge_sort.cpp.

References ASSERT_ND, CHECK_ERROR, is_ended_all(), foedus::DefaultInitializable::is_initialized(), is_no_merging(), and foedus::kRetOk.

Referenced by foedus::storage::array::ArrayComposeContext::execute(), foedus::storage::hash::HashComposeContext::execute(), and foedus::storage::masstree::MasstreeComposeContext::execute().

143  {
145  current_count_ = 0;
146  CHECK_ERROR(advance_window());
147 
148  if (is_ended_all()) {
149  return kRetOk;
150  }
151 
152  if (is_no_merging()) {
153  next_batch_one_input();
154  } else {
155  InputIndex min_input = pick_chunks();
156  batch_sort(min_input);
157  }
158  return kRetOk;
159 }
uint16_t InputIndex
Index in input streams.
Definition: merge_sort.hpp:85
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
bool is_no_merging() const __attribute__((always_inline))
Definition: merge_sort.hpp:356

Here is the call graph for this function:

Here is the caller graph for this function:

const log::RecordLogType* foedus::snapshot::MergeSort::resolve_merged_position ( MergedPosition  pos) const
inline

Definition at line 379 of file merge_sort.hpp.

References ASSERT_ND, foedus::snapshot::MergeSort::InputStatus::from_compact_pos(), foedus::snapshot::MergeSort::PositionEntry::input_index_, and foedus::snapshot::MergeSort::PositionEntry::input_position_.

Referenced by change_log_type_at(), and resolve_sort_position().

379  {
380  ASSERT_ND(pos < current_count_);
381  const PositionEntry& entry = position_entries_[pos];
382  return inputs_status_[entry.input_index_].from_compact_pos(entry.input_position_);
383  }
const log::RecordLogType * from_compact_pos(BufferPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:214
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

const log::RecordLogType* foedus::snapshot::MergeSort::resolve_sort_position ( uint32_t  sort_pos) const
inline

Definition at line 384 of file merge_sort.hpp.

References ASSERT_ND, foedus::snapshot::MergeSort::SortEntry::get_position(), and resolve_merged_position().

Referenced by foedus::storage::array::ArrayComposeContext::execute(), and foedus::storage::masstree::MasstreeComposeContext::execute().

384  {
385  ASSERT_ND(sort_pos < current_count_);
386  MergedPosition pos = sort_entries_[sort_pos].get_position();
387  return resolve_merged_position(pos);
388  }
const log::RecordLogType * resolve_merged_position(MergedPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:379
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::MergeSort::uninitialize_once ( )
inlineoverridevirtual

Implements foedus::DefaultInitializable.

Definition at line 354 of file merge_sort.hpp.

References foedus::kRetOk.

354 { return kRetOk; }
const ErrorStack kRetOk
Normal return value for no-error case.

The documentation for this class was generated from the following files: