libfoedus-core
FOEDUS Core Library
merge_sort.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 
24 #include "foedus/epoch.hpp"
30 #include "foedus/storage/page.hpp"
35 
36 namespace foedus {
37 namespace snapshot {
38 
45 const float kWindowMoveThreshold = 0.95;
46 
47 
48 uint16_t extract_shortest_key_length(SortedBuffer* const* inputs, uint16_t inputs_count) {
49  uint16_t ret = inputs[0]->get_cur_block_shortest_key_length();
50  for (uint16_t i = 1; i < inputs_count; ++i) {
51  ret = std::min<uint16_t>(ret, inputs[i]->get_cur_block_shortest_key_length());
52  }
53  return ret;
54 }
55 uint16_t extract_longest_key_length(SortedBuffer* const* inputs, uint16_t inputs_count) {
56  uint16_t ret = inputs[0]->get_cur_block_longest_key_length();
57  for (uint16_t i = 1; i < inputs_count; ++i) {
58  ret = std::max<uint16_t>(ret, inputs[i]->get_cur_block_longest_key_length());
59  }
60  return ret;
61 }
62 
66  Epoch base_epoch,
67  SortedBuffer* const* inputs,
68  uint16_t inputs_count,
69  uint16_t max_original_pages,
70  memory::AlignedMemory* const work_memory,
71  uint16_t chunk_batch_size)
73  id_(id),
74  type_(type),
75  base_epoch_(base_epoch),
76  shortest_key_length_(extract_shortest_key_length(inputs, inputs_count)),
77  longest_key_length_(extract_longest_key_length(inputs, inputs_count)),
78  inputs_(inputs),
79  inputs_count_(inputs_count),
80  max_original_pages_(max_original_pages),
81  chunk_batch_size_(chunk_batch_size),
82  work_memory_(work_memory) {
83  ASSERT_ND(shortest_key_length_ <= longest_key_length_);
84  ASSERT_ND(shortest_key_length_ > 0);
85  ASSERT_ND(chunk_batch_size_ > 0);
86  current_count_ = 0;
87  sort_entries_ = nullptr;
88  position_entries_ = nullptr;
89  original_pages_ = nullptr;
90  inputs_status_ = nullptr;
91 }
92 
94  // in each batch, we might include tuples from an input even if we didn't fully pick a chunk from
95  // it (at most kLogChunk-1 such tuples). so, conservatively chunk_batch_size_ + inputs_count_.
96  uint32_t buffer_capacity = kLogChunk * (chunk_batch_size_ + inputs_count_);
97  buffer_capacity_ = assorted::align<uint32_t, 512U>(buffer_capacity);
98  uint64_t byte_size = buffer_capacity_ * (sizeof(SortEntry) + sizeof(PositionEntry));
99  ASSERT_ND(byte_size % 4096U == 0);
100  byte_size += storage::kPageSize * (max_original_pages_ + 1U);
101  byte_size += sizeof(InputStatus) * inputs_count_;
102  WRAP_ERROR_CODE(work_memory_->assure_capacity(byte_size));
103 
104  // assign pointers
105  char* block = reinterpret_cast<char*>(work_memory_->get_block());
106 #ifndef NDEBUG
107  std::memset(block, 0xDA, work_memory_->get_size());
108 #endif // NDEBUG
109  uint64_t offset = 0;
110  sort_entries_ = reinterpret_cast<SortEntry*>(block + offset);
111  offset += sizeof(SortEntry) * buffer_capacity;
112  position_entries_ = reinterpret_cast<PositionEntry*>(block + offset);
113  offset += sizeof(PositionEntry) * buffer_capacity;
114  original_pages_ = reinterpret_cast<storage::Page*>(block + offset);
115  offset += sizeof(storage::Page) * (max_original_pages_ + 1U);
116  inputs_status_ = reinterpret_cast<InputStatus*>(block + offset);
117  offset += sizeof(InputStatus) * inputs_count_;
118  ASSERT_ND(offset == byte_size);
119 
120  // initialize inputs_status_
121  for (InputIndex i = 0; i < inputs_count_; ++i) {
122  InputStatus* status = inputs_status_ + i;
123  SortedBuffer* input = inputs_[i];
124  input->assert_checks();
125  status->window_ = input->get_buffer();
126  status->window_offset_ = input->get_offset();
127  status->window_size_ = input->get_buffer_size();
128  uint64_t cur_abs = input->get_cur_block_abosulte_begin();
129  // this is the initial read of this block, so we are sure cur_block_abosulte_begin is the window
130  ASSERT_ND(cur_abs >= status->window_offset_);
131  status->cur_relative_pos_ = cur_abs - status->window_offset_;
132  status->chunk_relative_pos_ = status->cur_relative_pos_; // hence the chunk has only 1 log
134 
135  uint64_t end_abs = input->get_cur_block_abosulte_end();
136  status->end_absolute_pos_ = end_abs;
137 
138  status->assert_consistent();
139  }
140  return kRetOk;
141 }
142 
145  current_count_ = 0;
146  CHECK_ERROR(advance_window());
147 
148  if (is_ended_all()) {
149  return kRetOk;
150  }
151 
152  if (is_no_merging()) {
153  next_batch_one_input();
154  } else {
155  InputIndex min_input = pick_chunks();
156  batch_sort(min_input);
157  }
158  return kRetOk;
159 }
160 
161 void MergeSort::next_batch_one_input() {
162  // In this case, we could even skip setting sort_entries_. However, composer benefits from the
163  // concise array that tells the most significant 8 bytes key, so we populate it even in this case.
165  // Note, even in this case, inputs_[0] might NOT be InMemorySortedBuffer.
166  // Example:
167  // in-memory: blocks for storage-1, storage-3
168  // dump-0: blocks for storage-1, storage-2, storage-3
169  // dump-1: blocks for storage-3
170  // For storage-2, dump-0 is the only input. Had a bug to overlook this case...
171  InputStatus* status = inputs_status_;
172  uint64_t relative_pos = status->cur_relative_pos_;
173  uint64_t end_pos = status->end_absolute_pos_ - status->window_offset_;
174  const uint32_t kLongestLog = 1U << 16;
175  if (dynamic_cast<InMemorySortedBuffer*>(inputs_[0])) {
176  VLOG(0) << "1-input in-memory case.";
177  ASSERT_ND(status->is_last_window()); // then it's always the last window.
178  ASSERT_ND(status->window_offset_ == 0);
179  ASSERT_ND(!status->is_ended());
180  ASSERT_ND(end_pos <= status->window_size_);
181  } else {
182  VLOG(0) << "1-input dump-file case.";
183  ASSERT_ND(dynamic_cast<DumpFileSortedBuffer*>(inputs_[0]));
184  // In this case, end_pos might be careful on a log spanning the end.
185  // rather than checking the length each time, we conservatively close the current window.
186  if (end_pos + kLongestLog > status->window_size_) {
187  end_pos = status->window_size_ - kLongestLog;
188  }
189  }
190 
191  ASSERT_ND(relative_pos <= end_pos + kLongestLog);
192  uint64_t processed = 0;
193  debugging::StopWatch watch;
194  if (type_ == storage::kArrayStorage) {
195  for (; LIKELY(relative_pos < end_pos && processed < buffer_capacity_); ++processed) {
196  relative_pos += populate_entry_array(0, relative_pos);
197  }
198  } else if (type_ == storage::kMasstreeStorage) {
199  for (; LIKELY(relative_pos < end_pos && processed < buffer_capacity_); ++processed) {
200  relative_pos += populate_entry_masstree(0, relative_pos);
201  }
202  } else {
204  for (; LIKELY(relative_pos < end_pos && processed < buffer_capacity_); ++processed) {
205  relative_pos += populate_entry_hash(0, relative_pos);
206  }
207  }
208  ASSERT_ND(relative_pos <= end_pos + kLongestLog);
209  ASSERT_ND(processed <= buffer_capacity_);
210  ASSERT_ND(current_count_ == processed);
211 
212  watch.stop();
213  VLOG(0) << "1-input case. from=" << status->cur_relative_pos_ << "b. processed " << processed
214  << " logs in " << watch.elapsed_ms() << "ms";
215  status->cur_relative_pos_ = relative_pos;
216  status->chunk_relative_pos_ = relative_pos;
217  status->previous_chunk_relative_pos_ = relative_pos;
218  status->assert_consistent();
219  assert_sorted();
220 }
221 
222 ErrorStack MergeSort::advance_window() {
223  // this method is called while we do not grab anything from the input yet.
224  // otherwise we can't move window here.
225  ASSERT_ND(current_count_ == 0);
226  for (InputIndex i = 0; i < inputs_count_; ++i) {
227  InputStatus* status = inputs_status_ + i;
228  if (status->is_ended() || status->is_last_window()) {
229  continue;
230  }
231  ASSERT_ND(status->cur_relative_pos_ <= status->chunk_relative_pos_);
232  ASSERT_ND(status->cur_relative_pos_ == status->previous_chunk_relative_pos_);
233  if (status->cur_relative_pos_
234  >= static_cast<uint64_t>(status->window_size_ * kWindowMoveThreshold)
235  || status->cur_relative_pos_ + kWindowChunkReserveBytes >= status->window_size_) {
236  uint64_t cur_abs_pos = status->to_absolute_pos(status->cur_relative_pos_);
237 
238  SortedBuffer* input = inputs_[i];
239  WRAP_ERROR_CODE(input->wind(cur_abs_pos));
240  status->window_offset_ = input->get_offset();
241  ASSERT_ND(status->window_size_ == input->get_buffer_size());
242  ASSERT_ND(status->window_ == input->get_buffer());
243 
244  ASSERT_ND(cur_abs_pos >= status->window_offset_);
245  status->cur_relative_pos_ = cur_abs_pos - status->window_offset_;
246  status->chunk_relative_pos_ = status->cur_relative_pos_;
247  status->previous_chunk_relative_pos_ = status->cur_relative_pos_;
248  status->assert_consistent();
249  }
250  ASSERT_ND(status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_
251  <= status->window_size_);
252  }
253 
254 #ifndef NDEBUG
255  // after the conservative move above, all inputs should be either
256  // 1) in last window, including already ended
257  // 2) in non-last window that has at least kWindowChunkReserveBytes to be consumed
258  // let's confirm.
259  for (InputIndex i = 0; i < inputs_count_; ++i) {
260  InputStatus* status = inputs_status_ + i;
261  status->assert_consistent();
262  if (status->is_ended() || status->is_last_window()) {
263  continue;
264  }
265  ASSERT_ND(status->cur_relative_pos_ + kWindowChunkReserveBytes <= status->window_size_);
266  ASSERT_ND(status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_
267  <= status->window_size_);
268  }
269 #endif // NDEBUG
270  return kRetOk;
271 }
272 
274  uint32_t sort_pos,
275  uint32_t count,
276  log::RecordLogType const** out) const {
277  ASSERT_ND(sort_pos <= current_count_);
278  uint32_t fetched_count = count;
279  if (sort_pos + count > current_count_) {
280  fetched_count = current_count_ - sort_pos;
281  }
282 
283  if (is_no_merging()) {
284  // no merge sort.
285 #ifndef NDEBUG
286  for (uint32_t i = 0; i < fetched_count; ++i) {
287  ASSERT_ND(sort_entries_[sort_pos + i].get_position() == sort_pos + i);
288  }
289 #endif // NDEBUG
290  // in this case, the pointed logs are also contiguous. no point to do prefetching.
291  for (uint32_t i = 0; i < fetched_count; ++i) {
292  MergedPosition pos = sort_pos + i;
293  ASSERT_ND(position_entries_[pos].input_index_ == 0);
294  out[i] = inputs_status_[0].from_compact_pos(position_entries_[pos].input_position_);
295  }
296  return fetched_count;
297  }
298 
299  // prefetch position entries
300  for (uint32_t i = 0; i < fetched_count; ++i) {
301  MergedPosition pos = sort_entries_[sort_pos + i].get_position();
302  assorted::prefetch_cacheline(position_entries_ + pos);
303  }
304  // prefetch and fetch logs
305  for (uint32_t i = 0; i < fetched_count; ++i) {
306  MergedPosition pos = sort_entries_[sort_pos + i].get_position();
307  InputIndex input = position_entries_[pos].input_index_;
308  out[i] = inputs_status_[input].from_compact_pos(position_entries_[pos].input_position_);
310  }
311  return fetched_count;
312 }
313 
314 void MergeSort::next_chunk(InputIndex input_index) {
315  InputStatus* status = inputs_status_ + input_index;
316  ASSERT_ND(!status->is_ended());
317  ASSERT_ND(!status->is_last_chunk_in_window());
318  status->assert_consistent();
319 
320  uint64_t pos = status->chunk_relative_pos_;
321  uint64_t relative_end = status->end_absolute_pos_ - status->window_offset_;
322  if (relative_end >= status->window_size_) {
323  relative_end = status->window_size_;
324  }
325  ASSERT_ND(pos + status->from_byte_pos(pos)->header_.log_length_ <= status->window_size_);
326 
327  // Be careful on advancing "too much". If the next log spans to next window,
328  // we can't use it as a chunk-log. But, we don't know the log length until we advance.
329  // We thus maintain two values:
330  // pos : we are sure log entry at this position completely fits in the window.
331  // next_pos : we are considering to set this value to pos
332  if (status->is_last_window()) {
333  // separately handle last-window, where there is no concern on spanning to next window,
334  // but instead we have to include the last log in last chunk.
335  for (uint32_t i = 0; i < kLogChunk; ++i) {
336  ASSERT_ND(pos < status->window_size_);
337  const log::RecordLogType* the_log = status->from_byte_pos(pos);
338  uint16_t log_length = the_log->header_.log_length_;
339  ASSERT_ND(pos + log_length <= relative_end); // because it's last window
340  if (pos + log_length >= relative_end) {
341  break;
342  }
343  pos += log_length;
344  }
345  } else {
346  uint64_t next_pos = pos;
347  for (uint32_t i = 0; i < kLogChunk; ++i) {
348  ASSERT_ND(next_pos < status->window_size_);
349  const log::RecordLogType* the_log = status->from_byte_pos(next_pos);
350  uint16_t log_length = the_log->header_.log_length_;
351  if (next_pos + log_length >= relative_end) {
352  break;
353  }
354  pos = next_pos;
355  next_pos += log_length;
356  }
357  }
358  ASSERT_ND(pos < relative_end);
359  ASSERT_ND(pos + status->from_byte_pos(pos)->header_.log_length_ <= status->window_size_);
360  status->previous_chunk_relative_pos_ = status->chunk_relative_pos_;
361  status->chunk_relative_pos_ = pos;
362 
363  status->assert_consistent();
364  // ALWAYS, the chunk-log is fully contained in the window.
365  ASSERT_ND(status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_
366  <= status->window_size_);
367 }
368 
369 MergeSort::InputIndex MergeSort::determine_min_input() const {
370  InputIndex min_input = kInvalidInput;
371  for (InputIndex i = 0; i < inputs_count_; ++i) {
372  InputStatus* status = inputs_status_ + i;
373  status->assert_consistent();
374  if (status->is_ended() || status->is_last_chunk_overall()) {
375  continue;
376  }
377  if (min_input == kInvalidInput) {
378  min_input = i;
379  } else {
380  ASSERT_ND(!inputs_status_[min_input].is_ended());
381  ASSERT_ND(!inputs_status_[min_input].is_last_chunk_overall());
382  if (compare_logs(status->get_chunk_log(), inputs_status_[min_input].get_chunk_log()) < 0) {
383  min_input = i;
384  }
385  }
386  }
387  return min_input;
388 }
389 
390 MergeSort::InputIndex MergeSort::pick_chunks() {
391  uint32_t chunks;
392  for (chunks = 0; chunks < chunk_batch_size_; ++chunks) {
393  InputIndex min_input = determine_min_input();
394  if (min_input == kInvalidInput) {
395  // now all inputs are in the last chunks, we can simply merge them all in one shot!
396  return kInvalidInput;
397  }
398 
399  if (inputs_status_[min_input].is_last_chunk_in_window()) {
400  VLOG(1) << "Min Input-" << min_input << " needs to shift window. chunks=" << chunks;
401  break;
402  }
403  next_chunk(min_input);
404 
405  inputs_status_[min_input].assert_consistent();
406  }
407 
408  VLOG(1) << "Now determining batch-threshold... chunks=" << chunks;
409  return determine_min_input();
410 }
411 
412 void MergeSort::batch_sort(MergeSort::InputIndex min_input) {
413  batch_sort_prepare(min_input);
414  ASSERT_ND(current_count_ <= buffer_capacity_);
415 
416  // First, sort it with std::sort, which is (*) smart enough to switch to heap sort for this case.
417  // (*) at least gcc's does.
418  debugging::StopWatch sort_watch;
419  std::sort(&(sort_entries_->data_), &(sort_entries_[current_count_].data_));
420  sort_watch.stop();
421  VLOG(1) << "Storage-" << id_ << ", merge sort (main) of " << current_count_ << " logs in "
422  << sort_watch.elapsed_ms() << "ms";
423 
424  // We need additional sorting just for masstree.
425  // Array never needs it because 8-byte is enough to compare precisely.
426  // Hash neither because it doesn't need the inputs to be fully sorted. Bin-sort is enough.
427  // Sequential doesn't need any sorting at all.
428  if (type_ == storage::kMasstreeStorage
429  && (shortest_key_length_ != 8U || longest_key_length_ != 8U)) {
430  // the sorting above has to be adjusted if we need additional logic for key comparison
431  batch_sort_adjust_sort();
432  }
433 
434  assert_sorted();
435 }
436 
437 void MergeSort::batch_sort_prepare(MergeSort::InputIndex min_input) {
438  current_count_ = 0;
439  if (min_input == kInvalidInput) {
440  // this is the last iteration! get all remaining logs from all inputs
441  for (InputIndex i = 0; i < inputs_count_; ++i) {
442  InputStatus* status = inputs_status_ + i;
443  ASSERT_ND(status->is_last_chunk_overall());
444  if (status->is_ended()) {
445  continue;
446  }
447  append_logs(i, status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_);
448  status->assert_consistent();
449  }
450  } else {
451  // merge-sort upto batch-threshold
452  const log::RecordLogType* threshold = inputs_status_[min_input].get_chunk_log();
453  for (InputIndex i = 0; i < inputs_count_; ++i) {
454  InputStatus* status = inputs_status_ + i;
455  if (status->is_ended()) {
456  continue;
457  }
458 
459  if (i == min_input) {
460  // the input that provides threshold itself. Hence, all logs before the last log are
461  // guaranteed to be strictly smaller than the threshold.
462  append_logs(i, status->chunk_relative_pos_);
463  ASSERT_ND(status->chunk_relative_pos_ == status->cur_relative_pos_);
464  } else {
465  // otherwise, we have to add logs that are smaller than the threshold.
466  // to avoid key comparison in most cases, we use "previous chunk" hint.
467  if (status->previous_chunk_relative_pos_ != status->chunk_relative_pos_) {
468  append_logs(i, status->previous_chunk_relative_pos_);
469  ASSERT_ND(status->previous_chunk_relative_pos_ == status->cur_relative_pos_);
470  }
471 
472  // and then we have to check one by one. we could do binary search here, but >90%
473  // of logs should be already appended by the previous-chunk optimization. not worth it.
474  uint64_t cur = status->cur_relative_pos_;
475  uint64_t end = status->chunk_relative_pos_ + status->get_chunk_log()->header_.log_length_;
476  ASSERT_ND(cur < end);
477  while (cur < end) {
478  const log::RecordLogType* the_log = status->from_byte_pos(cur);
479  // It must be _strictly_ smaller than the threshold
480  if (compare_logs(the_log, threshold) >= 0) {
481  break;
482  }
483  cur += the_log->header_.log_length_;
484  }
485  ASSERT_ND(cur <= end);
486  append_logs(i, cur);
487  if (cur == end) {
488  // this means we added even the last log. This can happen only at the overall last chunk
489  // because we pick batch threshold that is the smallest chunk-last-key.
490  ASSERT_ND(status->is_last_chunk_overall());
491  }
492  }
493 
494  status->assert_consistent();
495  }
496  }
497 }
498 
499 void MergeSort::batch_sort_adjust_sort() {
500  debugging::StopWatch sort_watch;
501  uint32_t cur = 0;
502  uint32_t debug_stat_run_count = 0;
503  uint32_t debug_stat_longest_run = 0;
504  uint32_t debug_stat_runs_total = 0;
505  while (LIKELY(cur + 1U < current_count_)) {
506  // if the 8-bytes key is strictly smaller, we don't need additional check.
507  // and it should be the vast majority of cases.
508  uint64_t short_key = sort_entries_[cur].get_key();
509  ASSERT_ND(short_key <= sort_entries_[cur + 1U].get_key());
510  if (LIKELY(short_key < sort_entries_[cur + 1U].get_key())) {
511  ++cur;
512  continue;
513  }
514 
515  // figure out how long the run goes on.
516  uint32_t next = cur + 2U;
517  bool needs_to_check =
518  sort_entries_[cur].needs_additional_check()
519  || sort_entries_[cur + 1U].needs_additional_check();
520  for (next = cur + 2U;
521  next < current_count_ && short_key == sort_entries_[next].get_key();
522  ++next) {
523  ASSERT_ND(short_key <= sort_entries_[next].get_key());
524  needs_to_check |= sort_entries_[next].needs_additional_check();
525  }
526  // now, next points to the first entry that has a different key (or current_count_). thus:
527  uint32_t run_length = next - cur;
528  debug_stat_runs_total += run_length;
529  debug_stat_longest_run = std::max<uint32_t>(debug_stat_longest_run, run_length);
530  ++debug_stat_run_count;
531 
532  // so far only masstree. hash does not need this either (at the cost of less compression)
534  if (needs_to_check) { // if all entries in this range are 8-bytes keys, no need.
535  AdjustComparatorMasstree comparator(position_entries_, inputs_status_);
536  std::sort(sort_entries_ + cur, sort_entries_ + next, comparator);
537  }
538  cur = next;
539  }
540  sort_watch.stop();
541  VLOG(1) << "Storage-" << id_ << ", merge sort (adjust) of " << current_count_ << " logs in "
542  << sort_watch.elapsed_ms() << "ms. run_count=" << debug_stat_run_count << ", "
543  << "longest_run=" << debug_stat_longest_run << ", total_runs=" << debug_stat_runs_total;
544 }
545 
546 
547 template <typename T>
549  const T* lhs_log = reinterpret_cast<const T*>(lhs);
550  const T* rhs_log = reinterpret_cast<const T*>(rhs);
551  return T::compare_logs(lhs_log, rhs_log);
552 }
553 
554 int MergeSort::compare_logs(const log::RecordLogType* lhs, const log::RecordLogType* rhs) const {
555  ASSERT_ND(lhs->header_.storage_id_ == id_);
556  ASSERT_ND(rhs->header_.storage_id_ == id_);
557  if (type_ == storage::kArrayStorage) {
560  return compare_logs_as< storage::array::ArrayCommonUpdateLogType >(lhs, rhs);
561  } else if (type_ == storage::kMasstreeStorage) {
564  return compare_logs_as< storage::masstree::MasstreeCommonLogType >(lhs, rhs);
565  } else {
569  return compare_logs_as< storage::hash::HashCommonLogType >(lhs, rhs);
570  }
571 }
572 
573 void MergeSort::append_logs(MergeSort::InputIndex input_index, uint64_t upto_relative_pos) {
574  InputStatus* status = inputs_status_ + input_index;
575  ASSERT_ND(status->to_absolute_pos(upto_relative_pos) <= status->end_absolute_pos_);
576  uint64_t relative_pos = status->cur_relative_pos_;
577  if (type_ == storage::kArrayStorage) {
578  while (LIKELY(relative_pos < upto_relative_pos)) {
579  relative_pos += populate_entry_array(input_index, relative_pos);
580  }
581  } else if (type_ == storage::kMasstreeStorage) {
582  while (LIKELY(relative_pos < upto_relative_pos)) {
583  relative_pos += populate_entry_masstree(input_index, relative_pos);
584  }
585  } else {
587  while (LIKELY(relative_pos < upto_relative_pos)) {
588  relative_pos += populate_entry_hash(input_index, relative_pos);
589  }
590  }
591  ASSERT_ND(relative_pos == upto_relative_pos);
592 
593  if (upto_relative_pos > status->chunk_relative_pos_) {
594  // we appeneded even the last log of this chunk! this should happen only at the last chunk.
595  ASSERT_ND(status->is_last_chunk_overall());
596  status->chunk_relative_pos_ = upto_relative_pos;
597  }
598  status->cur_relative_pos_ = upto_relative_pos;
599  status->previous_chunk_relative_pos_ = upto_relative_pos;
600  status->assert_consistent();
601 }
602 
603 inline uint16_t MergeSort::populate_entry_array(InputIndex input_index, uint64_t relative_pos) {
604  InputStatus* status = inputs_status_ + input_index;
605  ASSERT_ND(current_count_ < buffer_capacity_);
606  ASSERT_ND(relative_pos < status->window_size_);
607  ASSERT_ND(relative_pos % 8U == 0);
608  const storage::array::ArrayCommonUpdateLogType* the_log
609  = reinterpret_cast<const storage::array::ArrayCommonUpdateLogType*>(
610  status->from_byte_pos(relative_pos));
611  ASSERT_ND(is_array_log_type(the_log->header_.log_type_code_));
612  the_log->assert_valid_generic();
613 
614  Epoch epoch = the_log->header_.xct_id_.get_epoch();
615  ASSERT_ND(epoch.subtract(base_epoch_) < (1U << 16));
616  uint16_t compressed_epoch = epoch.subtract(base_epoch_);
617  sort_entries_[current_count_].set(
618  the_log->offset_,
619  compressed_epoch,
620  the_log->header_.xct_id_.get_ordinal(),
621  false,
622  current_count_);
623  position_entries_[current_count_].input_index_ = input_index;
624  position_entries_[current_count_].log_type_ = the_log->header_.log_type_code_;
625  position_entries_[current_count_].input_position_ = to_buffer_position(relative_pos);
626  ++current_count_;
627 
628  return the_log->header_.log_length_;
629 }
630 
631 inline uint16_t MergeSort::populate_entry_hash(InputIndex input_index, uint64_t relative_pos) {
632  InputStatus* status = inputs_status_ + input_index;
633  ASSERT_ND(current_count_ < buffer_capacity_);
634  ASSERT_ND(relative_pos < status->window_size_);
635  ASSERT_ND(relative_pos % 8U == 0);
636  const storage::hash::HashCommonLogType* the_log
637  = reinterpret_cast<const storage::hash::HashCommonLogType*>(
638  status->from_byte_pos(relative_pos));
639  ASSERT_ND(is_hash_log_type(the_log->header_.log_type_code_));
640  the_log->assert_type();
641 
642  Epoch epoch = the_log->header_.xct_id_.get_epoch();
643  ASSERT_ND(epoch.subtract(base_epoch_) < (1U << 16));
644  uint16_t compressed_epoch = epoch.subtract(base_epoch_);
645  uint16_t key_length = the_log->key_length_;
646  ASSERT_ND(key_length >= shortest_key_length_);
647  ASSERT_ND(key_length <= longest_key_length_);
648  storage::hash::HashBin bin = the_log->hash_ >> (64U - the_log->bin_bits_);
649  sort_entries_[current_count_].set(
650  bin,
651  compressed_epoch,
652  the_log->header_.xct_id_.get_ordinal(),
653  false,
654  current_count_);
655  position_entries_[current_count_].input_index_ = input_index;
656  position_entries_[current_count_].log_type_ = the_log->header_.log_type_code_;
657  position_entries_[current_count_].input_position_ = to_buffer_position(relative_pos);
658  ++current_count_;
659 
660  return the_log->header_.log_length_;
661 }
662 
663 inline uint16_t MergeSort::populate_entry_masstree(InputIndex input_index, uint64_t relative_pos) {
664  InputStatus* status = inputs_status_ + input_index;
665  ASSERT_ND(current_count_ < buffer_capacity_);
666  ASSERT_ND(relative_pos < status->window_size_);
667  ASSERT_ND(relative_pos % 8U == 0);
668  const storage::masstree::MasstreeCommonLogType* the_log
669  = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(
670  status->from_byte_pos(relative_pos));
671  ASSERT_ND(is_masstree_log_type(the_log->header_.log_type_code_));
672  the_log->assert_valid_generic();
673 
674  Epoch epoch = the_log->header_.xct_id_.get_epoch();
675  ASSERT_ND(epoch.subtract(base_epoch_) < (1U << 16));
676  uint16_t compressed_epoch = epoch.subtract(base_epoch_);
677  uint16_t key_length = the_log->key_length_;
678  ASSERT_ND(key_length >= shortest_key_length_);
679  ASSERT_ND(key_length <= longest_key_length_);
680  sort_entries_[current_count_].set(
681  the_log->get_first_slice(),
682  compressed_epoch,
683  the_log->header_.xct_id_.get_ordinal(),
684  key_length != sizeof(storage::masstree::KeySlice),
685  current_count_);
686  position_entries_[current_count_].input_index_ = input_index;
687  position_entries_[current_count_].log_type_ = the_log->header_.log_type_code_;
688  position_entries_[current_count_].input_position_ = to_buffer_position(relative_pos);
689  ++current_count_;
690 
691  return the_log->header_.log_length_;
692 }
693 
694 
696 #ifndef NDEBUG
697  for (MergedPosition i = 0; i < current_count_; ++i) {
698  MergedPosition cur_pos = sort_entries_[i].get_position();
699  const log::RecordLogType* cur = inputs_status_[position_entries_[cur_pos].input_index_].
700  from_compact_pos(position_entries_[cur_pos].input_position_);
701 
702  // does it point to a correct log?
703  Epoch epoch = cur->header_.xct_id_.get_epoch();
704  uint16_t compressed_epoch = epoch.subtract(base_epoch_);
705  SortEntry dummy;
706  if (type_ == storage::kArrayStorage) {
707  const auto* casted = reinterpret_cast<const storage::array::ArrayCommonUpdateLogType*>(cur);
708  dummy.set(
709  casted->offset_,
710  compressed_epoch,
711  cur->header_.xct_id_.get_ordinal(),
712  false,
713  cur_pos);
714  } else if (type_ == storage::kMasstreeStorage) {
715  const auto* casted = reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(cur);
716  dummy.set(
717  casted->get_first_slice(),
718  compressed_epoch,
719  cur->header_.xct_id_.get_ordinal(),
720  casted->key_length_ != sizeof(storage::masstree::KeySlice),
721  cur_pos);
722  } else {
724  const auto* casted = reinterpret_cast<const storage::hash::HashCommonLogType*>(cur);
725  casted->assert_type();
726  storage::hash::HashBin bin = casted->hash_ >> (64U - casted->bin_bits_);
727  dummy.set(
728  bin,
729  compressed_epoch,
730  cur->header_.xct_id_.get_ordinal(),
731  false, // hash doesn't need further sorting so far.
732  cur_pos);
733  }
734  ASSERT_ND(dummy.data_ == sort_entries_[i].data_);
735  if (i == 0) {
736  continue;
737  }
738 
739  // compare with previous
740  MergedPosition prev_pos = sort_entries_[i - 1].get_position();
741  ASSERT_ND(prev_pos != cur_pos);
742  const log::RecordLogType* prev = inputs_status_[position_entries_[prev_pos].input_index_].
743  from_compact_pos(position_entries_[prev_pos].input_position_);
744  int cmp = compare_logs(prev, cur);
745  ASSERT_ND(cmp <= 0);
746  if (cmp == 0) {
747  // the last of sort order is position.
748  ASSERT_ND(prev_pos < cur_pos);
749  }
750  }
751 #endif // NDEBUG
752 }
753 
754 } // namespace snapshot
755 } // namespace foedus
uint64_t window_offset_
relative pos counts from this offset
Definition: merge_sort.hpp:177
bool is_masstree_log_type(uint16_t log_type)
Definition: merge_sort.hpp:526
BufferPosition to_buffer_position(uint64_t byte_position)
Definition: snapshot_id.hpp:74
Definitions of IDs in this package and a few related constant values.
void assert_type() const __attribute__((always_inline))
Declares all log types used in this storage type.
int compare_logs_as(const log::RecordLogType *lhs, const log::RecordLogType *rhs)
Definition: merge_sort.cpp:548
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Definition: merge_sort.cpp:273
uint32_t subtract(const Epoch &other) const
Returns the number epochs from the given epoch to this epoch accounting for wrap-around.
Definition: epoch.hpp:137
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
const char * get_buffer() const
Returns the buffer memory.
Definition: log_buffer.hpp:108
Declares all log types used in this storage type.
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
void set(uint64_t key, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, bool needs_additional_check, MergedPosition position) __attribute__((always_inline))
Definition: merge_sort.hpp:117
uint32_t get_cur_block_longest_key_length() const
Current storage block's longest key length.
Definition: log_buffer.hpp:135
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
void assert_consistent() const __attribute__((always_inline))
Definition: merge_sort.hpp:181
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Represents a time epoch.
Definition: epoch.hpp:61
uint32_t get_ordinal() const __attribute__((always_inline))
Definition: xct_id.hpp:976
FileStatus status(const Path &p)
Returns the status of the file.
Definition: filesystem.cpp:45
Represents one input stream of sorted log entries.
Definition: log_buffer.hpp:81
Typical implementation of Initializable as a skeleton base class.
Provides additional information for each entry we are sorting.
Definition: merge_sort.hpp:148
uint64_t KeySlice
Each key slice is an 8-byte integer.
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
uint64_t get_offset() const
Returns the absolute byte position of the buffer's beginning in the entire file.
Definition: log_buffer.hpp:120
A base class for ArrayOverwriteLogType/ArrayIncrementLogType.
void assert_sorted()
For debug/test only.
Definition: merge_sort.cpp:695
uint64_t get_cur_block_abosulte_begin() const
Current storage block's beginning in absolute byte position in the file.
Definition: log_buffer.hpp:129
MergeSort(storage::StorageId id, storage::StorageType type, Epoch base_epoch, SortedBuffer *const *inputs, uint16_t inputs_count, uint16_t max_original_pages, memory::AlignedMemory *const work_memory, uint16_t chunk_batch_size=kDefaultChunkBatch)
Definition: merge_sort.cpp:63
uint16_t InputIndex
Index in input streams.
Definition: merge_sort.hpp:85
Declares all log types used in this storage type.
Constants and methods related to CPU cacheline and its prefetching.
uint32_t get_cur_block_shortest_key_length() const
Current storage block's shortest key length.
Definition: log_buffer.hpp:133
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
const log::RecordLogType * from_compact_pos(BufferPosition pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:214
bool is_hash_log_type(uint16_t log_type)
Definition: merge_sort.hpp:519
ErrorStack initialize_once() override
Definition: merge_sort.cpp:93
const MergeSort::InputIndex kInvalidInput
Represents null.
Definition: merge_sort.cpp:40
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
To avoid handling the case where a log spans an end of window, chunks leave at least this many bytes ...
Definition: merge_sort.hpp:101
uint16_t log_type_code_
Actually of LogCode defined in the X-Macro, but we want to make sure the type size is 2 bytes...
void * get_block() const
Returns the memory block.
uint16_t extract_shortest_key_length(SortedBuffer *const *inputs, uint16_t inputs_count)
Definition: merge_sort.cpp:48
uint16_t log_length_
Byte size of this log entry including this header itself and everything.
const log::RecordLogType * get_chunk_log() const __attribute__((always_inline))
Definition: merge_sort.hpp:205
uint16_t log_type_
not the enum itself for explicit size.
Definition: merge_sort.hpp:151
uint64_t get_size() const
Returns the byte size of the memory block.
StorageType
Type of the storage, such as hash.
Definition: storage_id.hpp:122
uint32_t MergedPosition
Position in MergeSort's buffer.
Definition: merge_sort.hpp:83
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
bool needs_additional_check() const __attribute__((always_inline))
Definition: merge_sort.hpp:135
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
bool is_array_log_type(uint16_t log_type)
Definition: merge_sort.hpp:516
Represents one memory block aligned to actual OS/hardware pages.
uint64_t get_cur_block_abosulte_end() const
Current storage block's end in absolute byte position in the file.
Definition: log_buffer.hpp:131
const ErrorStack kRetOk
Normal return value for no-error case.
const float kWindowMoveThreshold
Also, when the input consumed more than this fraction of current window, we move the window...
Definition: merge_sort.cpp:45
storage::StorageId storage_id_
The storage this loggable operation mainly affects.
Current status of each input.
Definition: merge_sort.hpp:167
uint64_t get_key() const __attribute__((always_inline))
Definition: merge_sort.hpp:132
uint16_t extract_longest_key_length(SortedBuffer *const *inputs, uint16_t inputs_count)
Definition: merge_sort.cpp:55
Base class for log type of record-wise operation.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
Entries we actually sort.
Definition: merge_sort.hpp:116
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
bool is_no_merging() const __attribute__((always_inline))
Definition: merge_sort.hpp:356
MergedPosition get_position() const __attribute__((always_inline))
Definition: merge_sort.hpp:138
uint64_t get_buffer_size() const
Returns the size of buffer memory.
Definition: log_buffer.hpp:111