20 #include <glog/logging.h>
50 for (uint16_t i = 1; i < inputs_count; ++i) {
57 for (uint16_t i = 1; i < inputs_count; ++i) {
68 uint16_t inputs_count,
69 uint16_t max_original_pages,
71 uint16_t chunk_batch_size)
75 base_epoch_(base_epoch),
79 inputs_count_(inputs_count),
80 max_original_pages_(max_original_pages),
81 chunk_batch_size_(chunk_batch_size),
82 work_memory_(work_memory) {
83 ASSERT_ND(shortest_key_length_ <= longest_key_length_);
87 sort_entries_ =
nullptr;
88 position_entries_ =
nullptr;
89 original_pages_ =
nullptr;
90 inputs_status_ =
nullptr;
96 uint32_t buffer_capacity =
kLogChunk * (chunk_batch_size_ + inputs_count_);
97 buffer_capacity_ = assorted::align<uint32_t, 512U>(buffer_capacity);
105 char* block =
reinterpret_cast<char*
>(work_memory_->
get_block());
107 std::memset(block, 0xDA, work_memory_->
get_size());
110 sort_entries_ =
reinterpret_cast<SortEntry*
>(block + offset);
111 offset +=
sizeof(
SortEntry) * buffer_capacity;
112 position_entries_ =
reinterpret_cast<PositionEntry*
>(block + offset);
114 original_pages_ =
reinterpret_cast<storage::Page*
>(block + offset);
115 offset +=
sizeof(
storage::Page) * (max_original_pages_ + 1U);
116 inputs_status_ =
reinterpret_cast<InputStatus*
>(block + offset);
121 for (
InputIndex i = 0; i < inputs_count_; ++i) {
153 next_batch_one_input();
156 batch_sort(min_input);
161 void MergeSort::next_batch_one_input() {
171 InputStatus*
status = inputs_status_;
173 uint64_t end_pos = status->end_absolute_pos_ - status->window_offset_;
174 const uint32_t kLongestLog = 1U << 16;
175 if (dynamic_cast<InMemorySortedBuffer*>(inputs_[0])) {
176 VLOG(0) <<
"1-input in-memory case.";
180 ASSERT_ND(end_pos <= status->window_size_);
182 VLOG(0) <<
"1-input dump-file case.";
183 ASSERT_ND(dynamic_cast<DumpFileSortedBuffer*>(inputs_[0]));
186 if (end_pos + kLongestLog > status->window_size_) {
187 end_pos = status->window_size_ - kLongestLog;
191 ASSERT_ND(relative_pos <= end_pos + kLongestLog);
192 uint64_t processed = 0;
193 debugging::StopWatch watch;
195 for (;
LIKELY(relative_pos < end_pos && processed < buffer_capacity_); ++processed) {
196 relative_pos += populate_entry_array(0, relative_pos);
199 for (;
LIKELY(relative_pos < end_pos && processed < buffer_capacity_); ++processed) {
200 relative_pos += populate_entry_masstree(0, relative_pos);
204 for (;
LIKELY(relative_pos < end_pos && processed < buffer_capacity_); ++processed) {
205 relative_pos += populate_entry_hash(0, relative_pos);
208 ASSERT_ND(relative_pos <= end_pos + kLongestLog);
209 ASSERT_ND(processed <= buffer_capacity_);
213 VLOG(0) <<
"1-input case. from=" << status->cur_relative_pos_ <<
"b. processed " << processed
214 <<
" logs in " << watch.elapsed_ms() <<
"ms";
215 status->cur_relative_pos_ = relative_pos;
216 status->chunk_relative_pos_ = relative_pos;
217 status->previous_chunk_relative_pos_ = relative_pos;
218 status->assert_consistent();
222 ErrorStack MergeSort::advance_window() {
226 for (
InputIndex i = 0; i < inputs_count_; ++i) {
227 InputStatus* status = inputs_status_ + i;
228 if (status->is_ended() || status->is_last_window()) {
231 ASSERT_ND(status->cur_relative_pos_ <= status->chunk_relative_pos_);
232 ASSERT_ND(status->cur_relative_pos_ == status->previous_chunk_relative_pos_);
233 if (status->cur_relative_pos_
234 >= static_cast<uint64_t>(status->window_size_ * kWindowMoveThreshold)
236 uint64_t cur_abs_pos = status->to_absolute_pos(status->cur_relative_pos_);
238 SortedBuffer* input = inputs_[i];
240 status->window_offset_ = input->get_offset();
241 ASSERT_ND(status->window_size_ == input->get_buffer_size());
242 ASSERT_ND(status->window_ == input->get_buffer());
244 ASSERT_ND(cur_abs_pos >= status->window_offset_);
245 status->cur_relative_pos_ = cur_abs_pos - status->window_offset_;
246 status->chunk_relative_pos_ = status->cur_relative_pos_;
247 status->previous_chunk_relative_pos_ = status->cur_relative_pos_;
248 status->assert_consistent();
250 ASSERT_ND(status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_
251 <= status->window_size_);
259 for (
InputIndex i = 0; i < inputs_count_; ++i) {
260 InputStatus* status = inputs_status_ + i;
262 if (status->is_ended() || status->is_last_window()) {
265 ASSERT_ND(status->cur_relative_pos_ + kWindowChunkReserveBytes <= status->window_size_);
266 ASSERT_ND(status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_
267 <= status->window_size_);
278 uint32_t fetched_count = count;
279 if (sort_pos + count > current_count_) {
280 fetched_count = current_count_ - sort_pos;
286 for (uint32_t i = 0; i < fetched_count; ++i) {
287 ASSERT_ND(sort_entries_[sort_pos + i].get_position() == sort_pos + i);
291 for (uint32_t i = 0; i < fetched_count; ++i) {
293 ASSERT_ND(position_entries_[pos].input_index_ == 0);
294 out[i] = inputs_status_[0].
from_compact_pos(position_entries_[pos].input_position_);
296 return fetched_count;
300 for (uint32_t i = 0; i < fetched_count; ++i) {
305 for (uint32_t i = 0; i < fetched_count; ++i) {
308 out[i] = inputs_status_[input].
from_compact_pos(position_entries_[pos].input_position_);
311 return fetched_count;
314 void MergeSort::next_chunk(InputIndex input_index) {
315 InputStatus* status = inputs_status_ + input_index;
317 ASSERT_ND(!status->is_last_chunk_in_window());
318 status->assert_consistent();
320 uint64_t pos = status->chunk_relative_pos_;
321 uint64_t relative_end = status->end_absolute_pos_ - status->window_offset_;
322 if (relative_end >= status->window_size_) {
323 relative_end = status->window_size_;
325 ASSERT_ND(pos + status->from_byte_pos(pos)->header_.log_length_ <= status->window_size_);
332 if (status->is_last_window()) {
335 for (uint32_t i = 0; i <
kLogChunk; ++i) {
339 ASSERT_ND(pos + log_length <= relative_end);
340 if (pos + log_length >= relative_end) {
346 uint64_t next_pos = pos;
347 for (uint32_t i = 0; i <
kLogChunk; ++i) {
348 ASSERT_ND(next_pos < status->window_size_);
349 const log::RecordLogType* the_log = status->from_byte_pos(next_pos);
351 if (next_pos + log_length >= relative_end) {
355 next_pos += log_length;
359 ASSERT_ND(pos + status->from_byte_pos(pos)->header_.log_length_ <= status->window_size_);
360 status->previous_chunk_relative_pos_ = status->chunk_relative_pos_;
361 status->chunk_relative_pos_ = pos;
363 status->assert_consistent();
365 ASSERT_ND(status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_
366 <= status->window_size_);
371 for (
InputIndex i = 0; i < inputs_count_; ++i) {
372 InputStatus* status = inputs_status_ + i;
374 if (status->is_ended() || status->is_last_chunk_overall()) {
377 if (min_input == kInvalidInput) {
380 ASSERT_ND(!inputs_status_[min_input].is_ended());
381 ASSERT_ND(!inputs_status_[min_input].is_last_chunk_overall());
382 if (compare_logs(status->get_chunk_log(), inputs_status_[min_input].
get_chunk_log()) < 0) {
392 for (chunks = 0; chunks < chunk_batch_size_; ++chunks) {
394 if (min_input == kInvalidInput) {
399 if (inputs_status_[min_input].is_last_chunk_in_window()) {
400 VLOG(1) <<
"Min Input-" << min_input <<
" needs to shift window. chunks=" << chunks;
403 next_chunk(min_input);
408 VLOG(1) <<
"Now determining batch-threshold... chunks=" << chunks;
409 return determine_min_input();
413 batch_sort_prepare(min_input);
414 ASSERT_ND(current_count_ <= buffer_capacity_);
418 debugging::StopWatch sort_watch;
419 std::sort(&(sort_entries_->
data_), &(sort_entries_[current_count_].data_));
421 VLOG(1) <<
"Storage-" << id_ <<
", merge sort (main) of " << current_count_ <<
" logs in "
422 << sort_watch.elapsed_ms() <<
"ms";
429 && (shortest_key_length_ != 8U || longest_key_length_ != 8U)) {
431 batch_sort_adjust_sort();
439 if (min_input == kInvalidInput) {
441 for (
InputIndex i = 0; i < inputs_count_; ++i) {
442 InputStatus* status = inputs_status_ + i;
443 ASSERT_ND(status->is_last_chunk_overall());
444 if (status->is_ended()) {
447 append_logs(i, status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_);
448 status->assert_consistent();
452 const log::RecordLogType* threshold = inputs_status_[min_input].
get_chunk_log();
453 for (
InputIndex i = 0; i < inputs_count_; ++i) {
454 InputStatus* status = inputs_status_ + i;
455 if (status->is_ended()) {
459 if (i == min_input) {
462 append_logs(i, status->chunk_relative_pos_);
463 ASSERT_ND(status->chunk_relative_pos_ == status->cur_relative_pos_);
467 if (status->previous_chunk_relative_pos_ != status->chunk_relative_pos_) {
468 append_logs(i, status->previous_chunk_relative_pos_);
469 ASSERT_ND(status->previous_chunk_relative_pos_ == status->cur_relative_pos_);
474 uint64_t cur = status->cur_relative_pos_;
475 uint64_t end = status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_;
478 const log::RecordLogType* the_log = status->from_byte_pos(cur);
480 if (compare_logs(the_log, threshold) >= 0) {
483 cur += the_log->header_.log_length_;
490 ASSERT_ND(status->is_last_chunk_overall());
494 status->assert_consistent();
499 void MergeSort::batch_sort_adjust_sort() {
500 debugging::StopWatch sort_watch;
502 uint32_t debug_stat_run_count = 0;
503 uint32_t debug_stat_longest_run = 0;
504 uint32_t debug_stat_runs_total = 0;
505 while (
LIKELY(cur + 1U < current_count_)) {
508 uint64_t short_key = sort_entries_[cur].
get_key();
509 ASSERT_ND(short_key <= sort_entries_[cur + 1U].get_key());
510 if (
LIKELY(short_key < sort_entries_[cur + 1U].get_key())) {
516 uint32_t next = cur + 2U;
517 bool needs_to_check =
520 for (next = cur + 2U;
521 next < current_count_ && short_key == sort_entries_[next].
get_key();
523 ASSERT_ND(short_key <= sort_entries_[next].get_key());
527 uint32_t run_length = next - cur;
528 debug_stat_runs_total += run_length;
529 debug_stat_longest_run = std::max<uint32_t>(debug_stat_longest_run, run_length);
530 ++debug_stat_run_count;
534 if (needs_to_check) {
535 AdjustComparatorMasstree comparator(position_entries_, inputs_status_);
536 std::sort(sort_entries_ + cur, sort_entries_ + next, comparator);
541 VLOG(1) <<
"Storage-" << id_ <<
", merge sort (adjust) of " << current_count_ <<
" logs in "
542 << sort_watch.elapsed_ms() <<
"ms. run_count=" << debug_stat_run_count <<
", "
543 <<
"longest_run=" << debug_stat_longest_run <<
", total_runs=" << debug_stat_runs_total;
547 template <
typename T>
549 const T* lhs_log =
reinterpret_cast<const T*
>(lhs);
550 const T* rhs_log =
reinterpret_cast<const T*
>(rhs);
551 return T::compare_logs(lhs_log, rhs_log);
560 return compare_logs_as< storage::array::ArrayCommonUpdateLogType >(lhs, rhs);
564 return compare_logs_as< storage::masstree::MasstreeCommonLogType >(lhs, rhs);
569 return compare_logs_as< storage::hash::HashCommonLogType >(lhs, rhs);
574 InputStatus* status = inputs_status_ + input_index;
575 ASSERT_ND(status->to_absolute_pos(upto_relative_pos) <= status->end_absolute_pos_);
576 uint64_t relative_pos = status->cur_relative_pos_;
578 while (
LIKELY(relative_pos < upto_relative_pos)) {
579 relative_pos += populate_entry_array(input_index, relative_pos);
582 while (
LIKELY(relative_pos < upto_relative_pos)) {
583 relative_pos += populate_entry_masstree(input_index, relative_pos);
587 while (
LIKELY(relative_pos < upto_relative_pos)) {
588 relative_pos += populate_entry_hash(input_index, relative_pos);
591 ASSERT_ND(relative_pos == upto_relative_pos);
593 if (upto_relative_pos > status->chunk_relative_pos_) {
595 ASSERT_ND(status->is_last_chunk_overall());
596 status->chunk_relative_pos_ = upto_relative_pos;
598 status->cur_relative_pos_ = upto_relative_pos;
599 status->previous_chunk_relative_pos_ = upto_relative_pos;
600 status->assert_consistent();
603 inline uint16_t MergeSort::populate_entry_array(InputIndex input_index, uint64_t relative_pos) {
604 InputStatus* status = inputs_status_ + input_index;
605 ASSERT_ND(current_count_ < buffer_capacity_);
606 ASSERT_ND(relative_pos < status->window_size_);
608 const storage::array::ArrayCommonUpdateLogType* the_log
609 =
reinterpret_cast<const storage::array::ArrayCommonUpdateLogType*
>(
610 status->from_byte_pos(relative_pos));
612 the_log->assert_valid_generic();
614 Epoch epoch = the_log->header_.xct_id_.get_epoch();
615 ASSERT_ND(epoch.subtract(base_epoch_) < (1U << 16));
616 uint16_t compressed_epoch = epoch.subtract(base_epoch_);
617 sort_entries_[current_count_].
set(
620 the_log->header_.xct_id_.get_ordinal(),
623 position_entries_[current_count_].
input_index_ = input_index;
624 position_entries_[current_count_].
log_type_ = the_log->header_.log_type_code_;
628 return the_log->header_.log_length_;
631 inline uint16_t MergeSort::populate_entry_hash(InputIndex input_index, uint64_t relative_pos) {
632 InputStatus* status = inputs_status_ + input_index;
633 ASSERT_ND(current_count_ < buffer_capacity_);
634 ASSERT_ND(relative_pos < status->window_size_);
636 const storage::hash::HashCommonLogType* the_log
637 =
reinterpret_cast<const storage::hash::HashCommonLogType*
>(
638 status->from_byte_pos(relative_pos));
640 the_log->assert_type();
642 Epoch epoch = the_log->header_.xct_id_.get_epoch();
643 ASSERT_ND(epoch.subtract(base_epoch_) < (1U << 16));
644 uint16_t compressed_epoch = epoch.subtract(base_epoch_);
645 uint16_t key_length = the_log->key_length_;
646 ASSERT_ND(key_length >= shortest_key_length_);
647 ASSERT_ND(key_length <= longest_key_length_);
649 sort_entries_[current_count_].
set(
652 the_log->header_.xct_id_.get_ordinal(),
655 position_entries_[current_count_].
input_index_ = input_index;
656 position_entries_[current_count_].
log_type_ = the_log->header_.log_type_code_;
660 return the_log->header_.log_length_;
663 inline uint16_t MergeSort::populate_entry_masstree(InputIndex input_index, uint64_t relative_pos) {
664 InputStatus* status = inputs_status_ + input_index;
665 ASSERT_ND(current_count_ < buffer_capacity_);
666 ASSERT_ND(relative_pos < status->window_size_);
668 const storage::masstree::MasstreeCommonLogType* the_log
669 =
reinterpret_cast<const storage::masstree::MasstreeCommonLogType*
>(
670 status->from_byte_pos(relative_pos));
672 the_log->assert_valid_generic();
674 Epoch epoch = the_log->header_.xct_id_.get_epoch();
675 ASSERT_ND(epoch.subtract(base_epoch_) < (1U << 16));
676 uint16_t compressed_epoch = epoch.subtract(base_epoch_);
677 uint16_t key_length = the_log->key_length_;
678 ASSERT_ND(key_length >= shortest_key_length_);
679 ASSERT_ND(key_length <= longest_key_length_);
680 sort_entries_[current_count_].
set(
681 the_log->get_first_slice(),
683 the_log->header_.xct_id_.get_ordinal(),
686 position_entries_[current_count_].
input_index_ = input_index;
687 position_entries_[current_count_].
log_type_ = the_log->header_.log_type_code_;
691 return the_log->header_.log_length_;
700 from_compact_pos(position_entries_[cur_pos].input_position_);
704 uint16_t compressed_epoch = epoch.
subtract(base_epoch_);
717 casted->get_first_slice(),
743 from_compact_pos(position_entries_[prev_pos].input_position_);
744 int cmp = compare_logs(prev, cur);
bool is_masstree_log_type(uint16_t log_type)
bool is_ended_all() const
BufferPosition to_buffer_position(uint64_t byte_position)
Definitions of IDs in this package and a few related constant values.
void assert_type() const __attribute__((always_inline))
Declares all log types used in this storage type.
int compare_logs_as(const log::RecordLogType *lhs, const log::RecordLogType *rhs)
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
uint32_t subtract(const Epoch &other) const
Returns the number epochs from the given epoch to this epoch accounting for wrap-around.
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
const char * get_buffer() const
Returns the buffer memory.
Declares all log types used in this storage type.
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
void set(uint64_t key, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, bool needs_additional_check, MergedPosition position) __attribute__((always_inline))
uint32_t get_cur_block_longest_key_length() const
Current storage block's longest key length.
Brings error stacktrace information as return value of functions.
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
uint32_t get_ordinal() const __attribute__((always_inline))
FileStatus status(const Path &p)
Returns the status of the file.
Represents one input stream of sorted log entries.
Typical implementation of Initializable as a skeleton base class.
Provides additional information for each entry we are sorting.
uint64_t KeySlice
Each key slice is an 8-byte integer.
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
#define LIKELY(x)
Hints that x is highly likely true.
uint64_t get_offset() const
Returns the absolute byte position of the buffer's beginning in the entire file.
A base class for ArrayOverwriteLogType/ArrayIncrementLogType.
void assert_sorted()
For debug/test only.
uint64_t get_cur_block_abosulte_begin() const
Current storage block's beginning in absolute byte position in the file.
MergeSort(storage::StorageId id, storage::StorageType type, Epoch base_epoch, SortedBuffer *const *inputs, uint16_t inputs_count, uint16_t max_original_pages, memory::AlignedMemory *const work_memory, uint16_t chunk_batch_size=kDefaultChunkBatch)
uint16_t InputIndex
Index in input streams.
Declares all log types used in this storage type.
Constants and methods related to CPU cacheline and its prefetching.
uint32_t get_cur_block_shortest_key_length() const
Current storage block's shortest key length.
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
bool is_hash_log_type(uint16_t log_type)
ErrorStack initialize_once() override
const MergeSort::InputIndex kInvalidInput
Represents null.
Epoch get_epoch() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
To avoid handling the case where a log spans an end of window, chunks leave at least this many bytes ...
void * get_block() const
Returns the memory block.
uint16_t extract_shortest_key_length(SortedBuffer *const *inputs, uint16_t inputs_count)
uint16_t log_type_
not the enum itself for explicit size.
uint64_t get_size() const
Returns the byte size of the memory block.
StorageType
Type of the storage, such as hash.
uint32_t MergedPosition
Position in MergeSort's buffer.
uint64_t HashBin
Represents a bin of a hash value.
bool needs_additional_check() const __attribute__((always_inline))
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
bool is_array_log_type(uint16_t log_type)
Represents one memory block aligned to actual OS/hardware pages.
uint64_t get_cur_block_abosulte_end() const
Current storage block's end in absolute byte position in the file.
const ErrorStack kRetOk
Normal return value for no-error case.
const float kWindowMoveThreshold
Also, when the input consumed more than this fraction of current window, we move the window...
BufferPosition input_position_
uint64_t get_key() const __attribute__((always_inline))
uint16_t extract_longest_key_length(SortedBuffer *const *inputs, uint16_t inputs_count)
Base class for log type of record-wise operation.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Entries we actually sort.
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
bool is_no_merging() const __attribute__((always_inline))
MergedPosition get_position() const __attribute__((always_inline))
uint64_t get_buffer_size() const
Returns the size of buffer memory.