18 #ifndef FOEDUS_SNAPSHOT_MERGE_SORT_HPP_
19 #define FOEDUS_SNAPSHOT_MERGE_SORT_HPP_
119 uint16_t compressed_epoch,
120 uint32_t in_epoch_ordinal,
123 ASSERT_ND(in_epoch_ordinal < (1U << 24));
126 =
static_cast<__uint128_t
>(key) << 64
127 | static_cast<__uint128_t>(compressed_epoch) << 48
128 |
static_cast<__uint128_t
>(in_epoch_ordinal) << 24
130 |
static_cast<__uint128_t
>(position);
133 return static_cast<uint64_t
>(
data_ >> 64);
136 return data_ & (1U << 23);
139 return static_cast<MergedPosition
>(
data_) & 0x7FFFFFU;
185 ASSERT_ND(cur_relative_pos_ <= window_size_);
186 ASSERT_ND(chunk_relative_pos_ <= window_size_);
187 ASSERT_ND(previous_chunk_relative_pos_ <= window_size_);
188 ASSERT_ND(window_offset_ <= end_absolute_pos_);
189 ASSERT_ND(cur_relative_pos_ + window_offset_ <= end_absolute_pos_);
190 ASSERT_ND(chunk_relative_pos_ + window_offset_ <= end_absolute_pos_);
191 ASSERT_ND(previous_chunk_relative_pos_ + window_offset_ <= end_absolute_pos_);
192 ASSERT_ND(cur_relative_pos_ <= chunk_relative_pos_);
193 ASSERT_ND(previous_chunk_relative_pos_ <= chunk_relative_pos_);
194 if (window_offset_ + cur_relative_pos_ != end_absolute_pos_) {
221 return window_offset_ + relative_pos;
235 ASSERT_ND(chunk_abs_pos <= end_absolute_pos_);
236 if (chunk_abs_pos >= end_absolute_pos_) {
241 ASSERT_ND(chunk_abs_pos + length <= end_absolute_pos_);
262 ASSERT_ND(left.get_key() <= right.get_key());
263 if (left.get_key() < right.get_key()) {
266 MergedPosition left_index = left.get_position();
267 MergedPosition right_index = right.get_position();
277 return cmp < 0 || (cmp == 0 && left_index < right_index);
302 uint16_t inputs_count,
303 uint16_t max_original_pages,
359 for (InputIndex i = 0; i < inputs_count_; ++i) {
360 if (!inputs_status_[i].is_ended()) {
372 return position_entries_;
375 MergedPosition pos = sort_entries_[sort_pos].
get_position();
386 MergedPosition pos = sort_entries_[sort_pos].
get_position();
397 MergedPosition pos = sort_entries_[sort_pos].
get_position();
417 const Epoch base_epoch_;
418 const uint16_t shortest_key_length_;
419 const uint16_t longest_key_length_;
423 const InputIndex inputs_count_;
425 const uint16_t max_original_pages_;
427 const uint16_t chunk_batch_size_;
435 MergedPosition current_count_;
437 uint32_t buffer_capacity_;
441 SortEntry* sort_entries_;
443 PositionEntry* position_entries_;
447 InputStatus* inputs_status_;
450 void next_batch_one_input();
464 void next_chunk(InputIndex input_index);
472 InputIndex determine_min_input()
const;
479 InputIndex pick_chunks();
484 void batch_sort(InputIndex min_input);
488 void batch_sort_prepare(InputIndex min_input);
493 void batch_sort_adjust_sort();
502 void append_logs(InputIndex input_index, uint64_t upto_relative_pos);
504 uint16_t populate_entry_array(InputIndex input_index, uint64_t relative_pos)
ALWAYS_INLINE;
505 uint16_t populate_entry_hash(InputIndex input_index, uint64_t relative_pos)
ALWAYS_INLINE;
506 uint16_t populate_entry_masstree(InputIndex input_index, uint64_t relative_pos)
ALWAYS_INLINE;
510 uint32_t groupify_find_common_keys_8b(uint32_t begin, uint32_t limit)
const ALWAYS_INLINE;
511 uint32_t groupify_find_common_keys_general(uint32_t begin, uint32_t limit)
const ALWAYS_INLINE;
512 uint32_t groupify_find_common_log_type(uint32_t begin, uint32_t limit)
const ALWAYS_INLINE;
541 if (
UNLIKELY(begin + 1U == current_count_)) {
547 if (shortest_key_length_ == 8U && longest_key_length_ == 8U) {
548 cur = groupify_find_common_keys_8b(begin, limit);
550 cur = groupify_find_common_keys_general(begin, limit);
557 result.
count_ = cur - begin + 1U;
562 cur = groupify_find_common_log_type(begin, limit);
568 result.
count_ = cur - begin + 1U;
575 inline uint32_t MergeSort::groupify_find_common_keys_8b(uint32_t begin, uint32_t limit)
const {
576 ASSERT_ND(shortest_key_length_ == 8U && longest_key_length_ == 8U);
578 const uint32_t end = limit ? std::min<uint32_t>(current_count_, begin + limit) : current_count_;
579 uint32_t cur = begin;
581 while (sort_entries_[cur].get_key() == sort_entries_[cur + 1U].
get_key()
582 &&
LIKELY(cur + 1U < end)) {
588 inline uint32_t MergeSort::groupify_find_common_keys_general(uint32_t begin, uint32_t limit)
const {
591 const uint32_t end = limit ? std::min<uint32_t>(current_count_, begin + limit) : current_count_;
592 uint32_t cur = begin;
594 while (sort_entries_[cur].get_key() == sort_entries_[cur + 1U].
get_key()
595 &&
LIKELY(cur + 1U < end)) {
596 if (sort_entries_[cur].needs_additional_check()
602 const storage::masstree::MasstreeCommonLogType* cur_log
603 =
reinterpret_cast<const storage::masstree::MasstreeCommonLogType*
>(
605 const storage::masstree::MasstreeCommonLogType* next_log
606 =
reinterpret_cast<const storage::masstree::MasstreeCommonLogType*
>(
608 uint16_t key_len = cur_log->key_length_;
609 ASSERT_ND(key_len != 8U || next_log->key_length_ != 8U);
610 if (key_len != next_log->key_length_) {
612 }
else if (key_len > 8U) {
613 if (std::memcmp(cur_log->get_key() + 8, next_log->get_key() + 8, key_len - 8U) != 0) {
623 inline uint32_t MergeSort::groupify_find_common_log_type(uint32_t begin, uint32_t limit)
const {
628 uint32_t cur = begin;
629 const uint32_t end = limit ? std::min<uint32_t>(current_count_, begin + limit) : current_count_;
630 PositionEntry cur_pos = position_entries_[sort_entries_[cur].
get_position()];
631 PositionEntry next_pos = position_entries_[sort_entries_[cur + 1U].get_position()];
632 if (
LIKELY(cur_pos.log_type_ != next_pos.log_type_)) {
642 const uint16_t kFetchSize = 8;
644 while (
LIKELY(cur + kFetchSize < end)) {
645 for (uint16_t i = 0; i < kFetchSize; ++i) {
648 for (uint16_t i = 0; i < kFetchSize; ++i) {
649 PositionEntry cur_pos = position_entries_[sort_entries_[cur + i].get_position()];
650 PositionEntry next_pos = position_entries_[sort_entries_[cur + i + 1U].get_position()];
652 if (
UNLIKELY(cur_pos.log_type_ != next_pos.log_type_)) {
660 while (cur + 1U < end) {
661 PositionEntry cur_pos = position_entries_[sort_entries_[cur].get_position()];
662 PositionEntry next_pos = position_entries_[sort_entries_[cur + 1U].get_position()];
663 if (cur_pos.log_type_ != next_pos.log_type_) {
673 #endif // FOEDUS_SNAPSHOT_MERGE_SORT_HPP_
storage::Page * get_original_pages() const __attribute__((always_inline))
LogCode
A unique identifier of all log types.
bool is_masstree_log_type(uint16_t log_type)
0x0033 : foedus::storage::masstree::MasstreeInsertLogType .
bool is_ended_all() const
const SortEntry * get_sort_entries() const __attribute__((always_inline))
InputStatus * inputs_status_
Definitions of IDs in this package and a few related constant values.
ErrorStack uninitialize_once() override
Typedefs of ID types used in snapshot package.
Declares all log types used in this storage type.
void change_log_type_at(uint32_t sort_pos, log::LogCode before, log::LogCode after)
used to switch between two compatible log types.
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
uint32_t BufferPosition
Represents a position in some buffer.
Declares all log types used in this storage type.
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
void set(uint64_t key, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, bool needs_additional_check, MergedPosition position) __attribute__((always_inline))
Suppose each log is 50 bytes: 1k*256*50b=12.5 MB worth logs to sort per batch.
Brings error stacktrace information as return value of functions.
log::LogCode get_log_type_from_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Forward declarations of classes in snapshot manager package.
0x002A : foedus::storage::hash::HashDeleteLogType .
Represents one input stream of sorted log entries.
Typical implementation of Initializable as a skeleton base class.
Provides additional information for each entry we are sorting.
bool are_all_single_layer_logs() const __attribute__((always_inline))
const log::RecordLogType * resolve_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Declares common log types used in all packages.
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
0x0023 : foedus::storage::array::ArrayIncrementLogType .
0x0022 : foedus::storage::array::ArrayOverwriteLogType .
#define LIKELY(x)
Hints that x is highly likely true.
log::LogCode get_log_type() const __attribute__((always_inline))
This is a theoretical max.
bool operator()(const SortEntry &left, const SortEntry &right) const __attribute__((always_inline))
AdjustComparatorMasstree(PositionEntry *position_entries, InputStatus *inputs_status)
Used in batch_sort_adjust_sort if the storage is a masstree storage.
0x0032 : foedus::storage::masstree::MasstreeOverwriteLogType .
void assert_sorted()
For debug/test only.
MergeSort(storage::StorageId id, storage::StorageType type, Epoch base_epoch, SortedBuffer *const *inputs, uint16_t inputs_count, uint16_t max_original_pages, memory::AlignedMemory *const work_memory, uint16_t chunk_batch_size=kDefaultChunkBatch)
const log::RecordLogType * resolve_merged_position(MergedPosition pos) const __attribute__((always_inline))
uint16_t InputIndex
Index in input streams.
static int compare_logs(const MasstreeCommonLogType *left, const MasstreeCommonLogType *right) __attribute__((always_inline))
Returns -1, 0, 1 when left is less than, same, larger than right in terms of key and xct_id...
Forward declarations of classes in storage package.
InputIndex get_inputs_count() const __attribute__((always_inline))
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
We assume the path wouldn't be this deep.
#define CXX11_FINAL
Used in public headers in place of "final" of C++11.
Constants and methods related to CPU cacheline and its prefetching.
MergedPosition get_current_count() const __attribute__((always_inline))
bool is_hash_log_type(uint16_t log_type)
Epoch get_base_epoch() const __attribute__((always_inline))
ErrorStack initialize_once() override
Just a marker to denote that the memory region represents a data page.
0x0029 : foedus::storage::hash::HashInsertLogType .
To avoid handling the case where a log spans an end of window, chunks leave at least this many bytes ...
const PositionEntry * get_position_entries() const __attribute__((always_inline))
uint16_t get_longest_key_length() const __attribute__((always_inline))
uint16_t log_type_
not the enum itself for explicit size.
Forward declarations of classes in memory package.
StorageType
Type of the storage, such as hash.
uint32_t MergedPosition
Position in MergeSort's buffer.
#define CXX11_OVERRIDE
Used in public headers in place of "override" of C++11.
bool has_common_log_code_
0x0034 : foedus::storage::masstree::MasstreeDeleteLogType .
bool needs_additional_check() const __attribute__((always_inline))
bool is_array_log_type(uint16_t log_type)
0x0028 : foedus::storage::hash::HashOverwriteLogType .
Represents one memory block aligned to actual OS/hardware pages.
const ErrorStack kRetOk
Normal return value for no-error case.
0x0035 : foedus::storage::masstree::MasstreeUpdateLogType .
0 is reserved as a non-existing log type.
PositionEntry * position_entries_
BufferPosition input_position_
Represents a group of consecutive logs in the current batch.
uint64_t get_key() const __attribute__((always_inline))
Base class for log type of record-wise operation.
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
storage::StorageId get_storage_id() const __attribute__((always_inline))
0x002B : foedus::storage::hash::HashUpdateLogType .
Entries we actually sort.
GroupifyResult groupify(uint32_t begin, uint32_t limit=0) const __attribute__((always_inline))
Find a group of consecutive logs from the given position that have either a common log type or a comm...
log::LogCode get_log_type() const __attribute__((always_inline))
bool is_no_merging() const __attribute__((always_inline))
MergedPosition get_position() const __attribute__((always_inline))