libfoedus-core
FOEDUS Core Library
merge_sort.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_SNAPSHOT_MERGE_SORT_HPP_
19 #define FOEDUS_SNAPSHOT_MERGE_SORT_HPP_
20 
21 #include <algorithm>
22 
23 #include "foedus/assert_nd.hpp"
24 #include "foedus/compiler.hpp"
25 #include "foedus/epoch.hpp"
26 #include "foedus/error_stack.hpp"
27 #include "foedus/initializable.hpp"
30 #include "foedus/memory/fwd.hpp"
31 #include "foedus/snapshot/fwd.hpp"
33 #include "foedus/storage/fwd.hpp"
37 
38 namespace foedus {
39 namespace snapshot {
78  public:
83  typedef uint32_t MergedPosition;
85  typedef uint16_t InputIndex;
86 
87  enum Constants {
89  kMaxMergedPosition = 1 << 23,
91  kLogChunk = 1 << 10,
95  kMaxLevels = 32,
102  };
103 
116  struct SortEntry {
117  inline void set(
118  uint64_t key,
119  uint16_t compressed_epoch,
120  uint32_t in_epoch_ordinal,
122  MergedPosition position) ALWAYS_INLINE {
123  ASSERT_ND(in_epoch_ordinal < (1U << 24));
124  ASSERT_ND(position < kMaxMergedPosition);
125  data_
126  = static_cast<__uint128_t>(key) << 64
127  | static_cast<__uint128_t>(compressed_epoch) << 48
128  | static_cast<__uint128_t>(in_epoch_ordinal) << 24
129  | static_cast<__uint128_t>(needs_additional_check ? 1U << 23 : 0)
130  | static_cast<__uint128_t>(position);
131  }
132  inline uint64_t get_key() const ALWAYS_INLINE {
133  return static_cast<uint64_t>(data_ >> 64);
134  }
135  inline bool needs_additional_check() const ALWAYS_INLINE {
136  return data_ & (1U << 23);
137  }
138  inline MergedPosition get_position() const ALWAYS_INLINE {
139  return static_cast<MergedPosition>(data_) & 0x7FFFFFU;
140  }
141 
142  __uint128_t data_;
143  };
144 
148  struct PositionEntry {
149  uint16_t input_index_; // +2 -> 2
151  uint16_t log_type_; // +2 -> 4
153 
155  return static_cast<log::LogCode>(log_type_);
156  }
157  };
158 
168  const char* window_;
173  char padding_[6];
177  uint64_t window_offset_;
178  uint64_t window_size_;
180 
181  inline void assert_consistent() const ALWAYS_INLINE {
182 #ifndef NDEBUG
183  // be careful on infinite loop! don't call any function here
184  ASSERT_ND(window_);
185  ASSERT_ND(cur_relative_pos_ <= window_size_);
186  ASSERT_ND(chunk_relative_pos_ <= window_size_);
187  ASSERT_ND(previous_chunk_relative_pos_ <= window_size_);
188  ASSERT_ND(window_offset_ <= end_absolute_pos_);
189  ASSERT_ND(cur_relative_pos_ + window_offset_ <= end_absolute_pos_);
190  ASSERT_ND(chunk_relative_pos_ + window_offset_ <= end_absolute_pos_);
191  ASSERT_ND(previous_chunk_relative_pos_ + window_offset_ <= end_absolute_pos_);
192  ASSERT_ND(cur_relative_pos_ <= chunk_relative_pos_);
193  ASSERT_ND(previous_chunk_relative_pos_ <= chunk_relative_pos_);
194  if (window_offset_ + cur_relative_pos_ != end_absolute_pos_) {
195  const log::RecordLogType* l
196  = reinterpret_cast<const log::RecordLogType*>(window_ + chunk_relative_pos_);
197  ASSERT_ND(chunk_relative_pos_ + l->header_.log_length_ <= window_size_);
198  }
199 #endif // NDEBUG
200  }
203  return reinterpret_cast<const log::RecordLogType*>(window_ + cur_relative_pos_);
204  }
207  return reinterpret_cast<const log::RecordLogType*>(window_ + chunk_relative_pos_);
208  }
209  inline const log::RecordLogType* from_byte_pos(uint64_t pos) const ALWAYS_INLINE {
211  ASSERT_ND(pos < window_size_);
212  return reinterpret_cast<const log::RecordLogType*>(window_ + pos);
213  }
216  ASSERT_ND(pos * 8ULL < window_size_);
217  return reinterpret_cast<const log::RecordLogType*>(window_ + pos * 8ULL);
218  }
219  inline uint64_t to_absolute_pos(uint64_t relative_pos) const ALWAYS_INLINE {
221  return window_offset_ + relative_pos;
222  }
223  inline bool is_last_window() const ALWAYS_INLINE {
225  return to_absolute_pos(window_size_) >= end_absolute_pos_;
226  }
227  inline bool is_ended() const ALWAYS_INLINE {
229  return to_absolute_pos(cur_relative_pos_) == end_absolute_pos_;
230  }
232  if (is_last_window()) {
233  // in last window, we accurately determines the last chunk
234  uint64_t chunk_abs_pos = to_absolute_pos(chunk_relative_pos_);
235  ASSERT_ND(chunk_abs_pos <= end_absolute_pos_);
236  if (chunk_abs_pos >= end_absolute_pos_) {
237  return true;
238  }
239  uint16_t length = get_chunk_log()->header_.log_length_;
240  ASSERT_ND(length > 0);
241  ASSERT_ND(chunk_abs_pos + length <= end_absolute_pos_);
242  return chunk_abs_pos + length >= end_absolute_pos_;
243  } else {
244  // in this case, we conservatively determines the last chunk
245  return chunk_relative_pos_ + kWindowChunkReserveBytes >= window_size_;
246  }
247  }
248  inline bool is_last_chunk_overall() const ALWAYS_INLINE {
250  }
251  };
252 
259  AdjustComparatorMasstree(PositionEntry* position_entries, InputStatus* inputs_status)
260  : position_entries_(position_entries), inputs_status_(inputs_status) {}
261  inline bool operator() (const SortEntry& left, const SortEntry& right) const ALWAYS_INLINE {
262  ASSERT_ND(left.get_key() <= right.get_key());
263  if (left.get_key() < right.get_key()) {
264  return true;
265  }
266  MergedPosition left_index = left.get_position();
267  MergedPosition right_index = right.get_position();
268  const PositionEntry& left_pos = position_entries_[left_index];
269  const PositionEntry& right_pos = position_entries_[right_index];
271  = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(
273  const storage::masstree::MasstreeCommonLogType* right_casted
274  = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(
276  int cmp = storage::masstree::MasstreeCommonLogType::compare_logs(left_casted, right_casted);
277  return cmp < 0 || (cmp == 0 && left_index < right_index);
278  }
281  };
282 
287  struct GroupifyResult {
288  uint32_t count_; // +4 -> 4
289  bool has_common_key_; // +1 -> 5
290  bool has_common_log_code_; // +1 -> 6
291  uint16_t log_code_; // +2 -> 8
293  return static_cast<log::LogCode>(log_code_);
294  }
295  };
296 
297  MergeSort(
300  Epoch base_epoch,
301  SortedBuffer* const* inputs,
302  uint16_t inputs_count,
303  uint16_t max_original_pages,
304  memory::AlignedMemory* const work_memory,
305  uint16_t chunk_batch_size = kDefaultChunkBatch);
306 
320 
331  uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const** out) const;
332 
345  GroupifyResult groupify(uint32_t begin, uint32_t limit = 0) const ALWAYS_INLINE;
346 
351  void assert_sorted();
352 
354  ErrorStack uninitialize_once() CXX11_OVERRIDE { return kRetOk; }
355 
356  inline bool is_no_merging() const ALWAYS_INLINE { return inputs_count_ == 1U; }
357 
358  inline bool is_ended_all() const {
359  for (InputIndex i = 0; i < inputs_count_; ++i) {
360  if (!inputs_status_[i].is_ended()) {
361  return false;
362  }
363  }
364  return true;
365  }
366 
367  inline storage::StorageId get_storage_id() const ALWAYS_INLINE { return id_; }
368  inline Epoch get_base_epoch() const ALWAYS_INLINE { return base_epoch_; }
369  inline MergedPosition get_current_count() const ALWAYS_INLINE { return current_count_; }
370  inline InputIndex get_inputs_count() const ALWAYS_INLINE { return inputs_count_; }
372  return position_entries_;
373  }
375  MergedPosition pos = sort_entries_[sort_pos].get_position();
376  return position_entries_[pos].get_log_type();
377  }
378  inline const SortEntry* get_sort_entries() const ALWAYS_INLINE { return sort_entries_; }
379  inline const log::RecordLogType* resolve_merged_position(MergedPosition pos) const ALWAYS_INLINE {
380  ASSERT_ND(pos < current_count_);
381  const PositionEntry& entry = position_entries_[pos];
382  return inputs_status_[entry.input_index_].from_compact_pos(entry.input_position_);
383  }
384  inline const log::RecordLogType* resolve_sort_position(uint32_t sort_pos) const ALWAYS_INLINE {
385  ASSERT_ND(sort_pos < current_count_);
386  MergedPosition pos = sort_entries_[sort_pos].get_position();
387  return resolve_merged_position(pos);
388  }
393  void change_log_type_at(uint32_t sort_pos, log::LogCode before, log::LogCode after) {
394  // we have to change two places; PositionEntry and header of the log itself.
395  ASSERT_ND(sort_pos < current_count_);
396  ASSERT_ND(get_log_type_from_sort_position(sort_pos) == before);
397  MergedPosition pos = sort_entries_[sort_pos].get_position();
398  PositionEntry& entry = position_entries_[pos];
399  ASSERT_ND(entry.get_log_type() == before);
400  entry.log_type_ = after;
401  ASSERT_ND(entry.get_log_type() == after);
402  ASSERT_ND(get_log_type_from_sort_position(sort_pos) == after);
403 
405  ASSERT_ND(record->header_.get_type() == before);
406  record->header_.log_type_code_ = after;
407  ASSERT_ND(record->header_.get_type() == after);
408  }
409 
410  inline storage::Page* get_original_pages() const ALWAYS_INLINE { return original_pages_; }
411  inline bool are_all_single_layer_logs() const ALWAYS_INLINE { return longest_key_length_ <= 8U; }
412  inline uint16_t get_longest_key_length() const ALWAYS_INLINE { return longest_key_length_; }
413 
414  private:
415  const storage::StorageId id_;
416  const storage::StorageType type_;
417  const Epoch base_epoch_;
418  const uint16_t shortest_key_length_;
419  const uint16_t longest_key_length_;
421  SortedBuffer* const* inputs_;
423  const InputIndex inputs_count_;
425  const uint16_t max_original_pages_;
427  const uint16_t chunk_batch_size_;
429  memory::AlignedMemory* const work_memory_;
430 
435  MergedPosition current_count_;
437  uint32_t buffer_capacity_;
438 
439  // followings are backed by work_memory_, and allocatd at initialize_once.
441  SortEntry* sort_entries_;
443  PositionEntry* position_entries_;
445  storage::Page* original_pages_;
447  InputStatus* inputs_status_;
448 
450  void next_batch_one_input();
451 
456  ErrorStack advance_window();
457 
464  void next_chunk(InputIndex input_index);
465 
472  InputIndex determine_min_input() const;
473 
479  InputIndex pick_chunks();
480 
484  void batch_sort(InputIndex min_input);
488  void batch_sort_prepare(InputIndex min_input);
493  void batch_sort_adjust_sort();
494 
500  int compare_logs(const log::RecordLogType* lhs, const log::RecordLogType* rhs) const;
501 
502  void append_logs(InputIndex input_index, uint64_t upto_relative_pos);
503 
504  uint16_t populate_entry_array(InputIndex input_index, uint64_t relative_pos) ALWAYS_INLINE;
505  uint16_t populate_entry_hash(InputIndex input_index, uint64_t relative_pos) ALWAYS_INLINE;
506  uint16_t populate_entry_masstree(InputIndex input_index, uint64_t relative_pos) ALWAYS_INLINE;
507 
508  // these methods are called very frequently (potentially for each log). worth explicit inlining.
509  // all of them return the position upto (inclusive) which the logs have the common feature.
510  uint32_t groupify_find_common_keys_8b(uint32_t begin, uint32_t limit) const ALWAYS_INLINE;
511  uint32_t groupify_find_common_keys_general(uint32_t begin, uint32_t limit) const ALWAYS_INLINE;
512  uint32_t groupify_find_common_log_type(uint32_t begin, uint32_t limit) const ALWAYS_INLINE;
513 };
514 
515 
516 inline bool is_array_log_type(uint16_t log_type) {
517  return log_type == log::kLogCodeArrayOverwrite || log_type == log::kLogCodeArrayIncrement;
518 }
519 inline bool is_hash_log_type(uint16_t log_type) {
520  return
521  log_type == log::kLogCodeHashOverwrite
522  || log_type == log::kLogCodeHashInsert
523  || log_type == log::kLogCodeHashDelete
524  || log_type == log::kLogCodeHashUpdate;
525 }
526 inline bool is_masstree_log_type(uint16_t log_type) {
527  return
528  log_type == log::kLogCodeMasstreeInsert
529  || log_type == log::kLogCodeMasstreeDelete
530  || log_type == log::kLogCodeMasstreeUpdate
531  || log_type == log::kLogCodeMasstreeOverwrite;
532 }
533 
534 inline MergeSort::GroupifyResult MergeSort::groupify(uint32_t begin, uint32_t limit) const {
535  ASSERT_ND(begin < current_count_);
536  GroupifyResult result;
537  result.count_ = 1;
538  result.has_common_key_ = false;
539  result.has_common_log_code_ = false;
541  if (UNLIKELY(begin + 1U == current_count_)) {
542  return result;
543  }
544 
545  // first, check common keys
546  uint32_t cur;
547  if (shortest_key_length_ == 8U && longest_key_length_ == 8U) {
548  cur = groupify_find_common_keys_8b(begin, limit);
549  } else {
550  cur = groupify_find_common_keys_general(begin, limit);
551  }
552 
553  ASSERT_ND(cur >= begin);
554  ASSERT_ND(cur < current_count_);
555  if (UNLIKELY(cur != begin)) {
556  result.has_common_key_ = true;
557  result.count_ = cur - begin + 1U;
558  return result;
559  }
560 
561  // if keys are different, check common log types.
562  cur = groupify_find_common_log_type(begin, limit);
563  ASSERT_ND(cur >= begin);
564  ASSERT_ND(cur < current_count_);
565  if (UNLIKELY(cur != begin)) {
566  result.has_common_log_code_ = true;
567  result.log_code_ = position_entries_[sort_entries_[cur].get_position()].log_type_;
568  result.count_ = cur - begin + 1U;
569  return result;
570  } else {
571  return result;
572  }
573 }
574 
575 inline uint32_t MergeSort::groupify_find_common_keys_8b(uint32_t begin, uint32_t limit) const {
576  ASSERT_ND(shortest_key_length_ == 8U && longest_key_length_ == 8U);
577  ASSERT_ND(begin + 1U < current_count_);
578  const uint32_t end = limit ? std::min<uint32_t>(current_count_, begin + limit) : current_count_;
579  uint32_t cur = begin;
580  // 8 byte key is enough to determine equality.
581  while (sort_entries_[cur].get_key() == sort_entries_[cur + 1U].get_key()
582  && LIKELY(cur + 1U < end)) {
583  ++cur;
584  }
585  return cur;
586 }
587 
588 inline uint32_t MergeSort::groupify_find_common_keys_general(uint32_t begin, uint32_t limit) const {
590  ASSERT_ND(begin + 1U < current_count_);
591  const uint32_t end = limit ? std::min<uint32_t>(current_count_, begin + limit) : current_count_;
592  uint32_t cur = begin;
593  // we might need more. a bit slower.
594  while (sort_entries_[cur].get_key() == sort_entries_[cur + 1U].get_key()
595  && LIKELY(cur + 1U < end)) {
596  if (sort_entries_[cur].needs_additional_check()
597  || sort_entries_[cur + 1U].needs_additional_check()) {
598  MergedPosition cur_pos = sort_entries_[cur].get_position();
599  MergedPosition next_pos = sort_entries_[cur + 1U].get_position();
600  ASSERT_ND(is_masstree_log_type(position_entries_[cur_pos].get_log_type()));
601  ASSERT_ND(is_masstree_log_type(position_entries_[next_pos].get_log_type()));
602  const storage::masstree::MasstreeCommonLogType* cur_log
603  = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(
604  resolve_merged_position(cur_pos));
605  const storage::masstree::MasstreeCommonLogType* next_log
606  = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(
607  resolve_merged_position(next_pos));
608  uint16_t key_len = cur_log->key_length_;
609  ASSERT_ND(key_len != 8U || next_log->key_length_ != 8U);
610  if (key_len != next_log->key_length_) {
611  break;
612  } else if (key_len > 8U) {
613  if (std::memcmp(cur_log->get_key() + 8, next_log->get_key() + 8, key_len - 8U) != 0) {
614  break;
615  }
616  }
617  }
618  ++cur;
619  }
620  return cur;
621 }
622 
623 inline uint32_t MergeSort::groupify_find_common_log_type(uint32_t begin, uint32_t limit) const {
624  ASSERT_ND(begin + 1U < current_count_);
625 
626  // First, let's avoid (relatively) expensive checks if there is no common log type,
627  // and assume the worst case; we are calling this method for each log.
628  uint32_t cur = begin;
629  const uint32_t end = limit ? std::min<uint32_t>(current_count_, begin + limit) : current_count_;
630  PositionEntry cur_pos = position_entries_[sort_entries_[cur].get_position()];
631  PositionEntry next_pos = position_entries_[sort_entries_[cur + 1U].get_position()];
632  if (LIKELY(cur_pos.log_type_ != next_pos.log_type_)) {
633  return cur;
634  }
635 
636  // the LIKELY above will be false if there is a group, but then the cost of branch misdetection
637  // is amortized by the (hopefully) large number of logs processed together below.
638 
639  // okay, from now on, there likely is a number of logs with same log type.
640  // this method has to read potentially a large number of position entries, which might be
641  // not contiguous. thus, let's do parallel prefetching.
642  const uint16_t kFetchSize = 8;
643  cur = begin + 1U;
644  while (LIKELY(cur + kFetchSize < end)) {
645  for (uint16_t i = 0; i < kFetchSize; ++i) {
646  assorted::prefetch_cacheline(position_entries_ + sort_entries_[cur + i].get_position());
647  }
648  for (uint16_t i = 0; i < kFetchSize; ++i) {
649  PositionEntry cur_pos = position_entries_[sort_entries_[cur + i].get_position()];
650  PositionEntry next_pos = position_entries_[sort_entries_[cur + i + 1U].get_position()];
651  // now that we assume there is a large group, this is UNLIKELY.
652  if (UNLIKELY(cur_pos.log_type_ != next_pos.log_type_)) {
653  return cur + i;
654  }
655  }
656  cur += kFetchSize;
657  }
658 
659  // last bits. no optimization needed.
660  while (cur + 1U < end) {
661  PositionEntry cur_pos = position_entries_[sort_entries_[cur].get_position()];
662  PositionEntry next_pos = position_entries_[sort_entries_[cur + 1U].get_position()];
663  if (cur_pos.log_type_ != next_pos.log_type_) {
664  return cur;
665  }
666  ++cur;
667  }
668  return cur;
669 }
670 
671 } // namespace snapshot
672 } // namespace foedus
673 #endif // FOEDUS_SNAPSHOT_MERGE_SORT_HPP_
storage::Page * get_original_pages() const __attribute__((always_inline))
Definition: merge_sort.hpp:410
uint64_t window_offset_
relative pos counts from this offset
Definition: merge_sort.hpp:177
LogCode
A unique identifier of all log types.
Definition: log_type.hpp:87
bool is_masstree_log_type(uint16_t log_type)
Definition: merge_sort.hpp:526
0x0033 : foedus::storage::masstree::MasstreeInsertLogType .
Definition: log_type.hpp:127
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:378
Definitions of IDs in this package and a few related constant values.
ErrorStack uninitialize_once() override
Definition: merge_sort.hpp:354
Typedefs of ID types used in snapshot package.
Declares all log types used in this storage type.
void change_log_type_at(uint32_t sort_pos, log::LogCode before, log::LogCode after)
used to switch between two compatible log types.
Definition: merge_sort.hpp:393
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Definition: merge_sort.cpp:273
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
Declares all log types used in this storage type.
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
bool is_ended() const __attribute__((always_inline))
Definition: merge_sort.hpp:227
void set(uint64_t key, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, bool needs_additional_check, MergedPosition position) __attribute__((always_inline))
Definition: merge_sort.hpp:117
bool is_last_chunk_in_window() const __attribute__((always_inline))
Definition: merge_sort.hpp:231
Suppose each log is 50 bytes: 1k*256*50b=12.5 MB worth logs to sort per batch.
Definition: merge_sort.hpp:93
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
void assert_consistent() const __attribute__((always_inline))
Definition: merge_sort.hpp:181
log::LogCode get_log_type_from_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:374
Forward declarations of classes in snapshot manager package.
Represents a time epoch.
Definition: epoch.hpp:61
0x002A : foedus::storage::hash::HashDeleteLogType .
Definition: log_type.hpp:123
Represents one input stream of sorted log entries.
Definition: log_buffer.hpp:81
Typical implementation of Initializable as a skeleton base class.
Provides additional information for each entry we are sorting.
Definition: merge_sort.hpp:148
bool are_all_single_layer_logs() const __attribute__((always_inline))
Definition: merge_sort.hpp:411
const log::RecordLogType * resolve_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:384
Declares common log types used in all packages.
bool is_last_chunk_overall() const __attribute__((always_inline))
Definition: merge_sort.hpp:248
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
0x0023 : foedus::storage::array::ArrayIncrementLogType .
Definition: log_type.hpp:116
0x0022 : foedus::storage::array::ArrayOverwriteLogType .
Definition: log_type.hpp:115
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
log::LogCode get_log_type() const __attribute__((always_inline))
Definition: merge_sort.hpp:292
uint64_t to_absolute_pos(uint64_t relative_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:219
bool operator()(const SortEntry &left, const SortEntry &right) const __attribute__((always_inline))
Definition: merge_sort.hpp:261
AdjustComparatorMasstree(PositionEntry *position_entries, InputStatus *inputs_status)
Definition: merge_sort.hpp:259
Used in batch_sort_adjust_sort if the storage is a masstree storage.
Definition: merge_sort.hpp:258
0x0032 : foedus::storage::masstree::MasstreeOverwriteLogType .
Definition: log_type.hpp:126
void assert_sorted()
For debug/test only.
Definition: merge_sort.cpp:695
MergeSort(storage::StorageId id, storage::StorageType type, Epoch base_epoch, SortedBuffer *const *inputs, uint16_t inputs_count, uint16_t max_original_pages, memory::AlignedMemory *const work_memory, uint16_t chunk_batch_size=kDefaultChunkBatch)
Definition: merge_sort.cpp:63
const log::RecordLogType * resolve_merged_position(MergedPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:379
uint16_t InputIndex
Index in input streams.
Definition: merge_sort.hpp:85
static int compare_logs(const MasstreeCommonLogType *left, const MasstreeCommonLogType *right) __attribute__((always_inline))
Returns -1, 0, 1 when left is less than, same, larger than right in terms of key and xct_id...
Forward declarations of classes in storage package.
const log::RecordLogType * get_cur_log() const __attribute__((always_inline))
Definition: merge_sort.hpp:201
InputIndex get_inputs_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:370
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
Definition: merge_sort.hpp:77
We assume the path wouldn't be this deep.
Definition: merge_sort.hpp:95
#define CXX11_FINAL
Used in public headers in place of "final" of C++11.
Definition: cxx11.hpp:131
bool is_last_window() const __attribute__((always_inline))
Definition: merge_sort.hpp:223
Constants and methods related to CPU cacheline and its prefetching.
MergedPosition get_current_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:369
const log::RecordLogType * from_compact_pos(BufferPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:214
bool is_hash_log_type(uint16_t log_type)
Definition: merge_sort.hpp:519
Epoch get_base_epoch() const __attribute__((always_inline))
Definition: merge_sort.hpp:368
ErrorStack initialize_once() override
Definition: merge_sort.cpp:93
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
0x0029 : foedus::storage::hash::HashInsertLogType .
Definition: log_type.hpp:122
To avoid handling the case where a log spans an end of window, chunks leave at least this many bytes ...
Definition: merge_sort.hpp:101
uint16_t log_type_code_
Actually of LogCode defined in the X-Macro, but we want to make sure the type size is 2 bytes...
uint16_t log_length_
Byte size of this log entry including this header itself and everything.
const log::RecordLogType * get_chunk_log() const __attribute__((always_inline))
Definition: merge_sort.hpp:205
const PositionEntry * get_position_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:371
uint16_t get_longest_key_length() const __attribute__((always_inline))
Definition: merge_sort.hpp:412
uint16_t log_type_
not the enum itself for explicit size.
Definition: merge_sort.hpp:151
Forward declarations of classes in memory package.
StorageType
Type of the storage, such as hash.
Definition: storage_id.hpp:122
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
#define CXX11_OVERRIDE
Used in public headers in place of "override" of C++11.
Definition: cxx11.hpp:134
0x0034 : foedus::storage::masstree::MasstreeDeleteLogType .
Definition: log_type.hpp:128
bool needs_additional_check() const __attribute__((always_inline))
Definition: merge_sort.hpp:135
bool is_array_log_type(uint16_t log_type)
Definition: merge_sort.hpp:516
0x0028 : foedus::storage::hash::HashOverwriteLogType .
Definition: log_type.hpp:121
Represents one memory block aligned to actual OS/hardware pages.
const ErrorStack kRetOk
Normal return value for no-error case.
0x0035 : foedus::storage::masstree::MasstreeUpdateLogType .
Definition: log_type.hpp:129
0 is reserved as a non-existing log type.
Definition: log_type.hpp:89
Current status of each input.
Definition: merge_sort.hpp:167
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
Represents a group of consecutive logs in the current batch.
Definition: merge_sort.hpp:287
uint64_t get_key() const __attribute__((always_inline))
Definition: merge_sort.hpp:132
Base class for log type of record-wise operation.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106
storage::StorageId get_storage_id() const __attribute__((always_inline))
Definition: merge_sort.hpp:367
0x002B : foedus::storage::hash::HashUpdateLogType .
Definition: log_type.hpp:124
Entries we actually sort.
Definition: merge_sort.hpp:116
GroupifyResult groupify(uint32_t begin, uint32_t limit=0) const __attribute__((always_inline))
Find a group of consecutive logs from the given position that have either a common log type or a comm...
Definition: merge_sort.hpp:534
log::LogCode get_log_type() const __attribute__((always_inline))
Definition: merge_sort.hpp:154
bool is_no_merging() const __attribute__((always_inline))
Definition: merge_sort.hpp:356
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138
const log::RecordLogType * from_byte_pos(uint64_t pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:209