libfoedus-core
FOEDUS Core Library
masstree_composer_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 #include <ostream>
25 #include <string>
26 #include <vector>
27 
28 #include "foedus/engine.hpp"
43 
44 namespace foedus {
45 namespace storage {
46 namespace masstree {
47 
48 inline void fill_payload_padded(char* destination, const char* source, PayloadLength count) {
49  if (count > 0) {
51  std::memcpy(destination, source, assorted::align8(count));
52  }
53 }
54 
55 
57  ASSERT_ND(page->is_border());
58  return reinterpret_cast<MasstreeBorderPage*>(page);
59 }
60 
62  ASSERT_ND(!page->is_border());
63  return reinterpret_cast<MasstreeIntermediatePage*>(page);
64 }
65 
72  : engine_(parent->get_engine()),
73  storage_id_(parent->get_storage_id()),
74  storage_(engine_, storage_id_) {
75  ASSERT_ND(storage_.exists());
76 }
77 
79  VLOG(0) << to_string() << " composing with " << args.log_streams_count_ << " streams.";
80  debugging::StopWatch stop_watch;
81 
82  snapshot::MergeSort merge_sort(
83  storage_id_,
85  args.base_epoch_,
86  args.log_streams_,
87  args.log_streams_count_,
89  args.work_memory_);
90  CHECK_ERROR(merge_sort.initialize());
91 
92  MasstreeComposeContext context(engine_, &merge_sort, args);
93  CHECK_ERROR(context.execute());
94 
95  CHECK_ERROR(merge_sort.uninitialize()); // no need for scoped release. its destructor is safe.
96 
97  stop_watch.stop();
98  LOG(INFO) << to_string() << " done in " << stop_watch.elapsed_ms() << "ms.";
99  return kRetOk;
100 }
101 
103  if (pointer == 0) {
104  return false;
105  }
107  ASSERT_ND(snapshot_id != snapshot::kNullSnapshotId);
108  return snapshot_id == args.snapshot_writer_->get_snapshot_id();
109 }
110 
113  VLOG(0) << to_string() << " combining " << args.root_info_pages_count_ << " root page info.";
114  debugging::StopWatch stop_watch;
116  Page* new_root = args.snapshot_writer_->get_page_base();
117  std::memcpy(new_root, args.root_info_pages_[0], kPageSize); // use the first one as base
118  if (args.root_info_pages_count_ == 1U) {
119  VLOG(0) << to_string() << " only 1 root info page, so we just use it as the new root.";
120  } else {
121  // otherwise, we merge inputs from each reducer. because of how we design partitions,
122  // the inputs are always intermediate pages with same partitioning. we just place new
123  // pointers to children
124  CHECK_ERROR(check_buddies(args));
125 
126  // because of how we partition, B-tree levels might be unbalanced at the root of first layer.
127  // thus, we take the max of btree-level here.
128  MasstreeIntermediatePage* base = reinterpret_cast<MasstreeIntermediatePage*>(new_root);
129  ASSERT_ND(!base->is_border());
130  for (uint16_t buddy = 1; buddy < args.root_info_pages_count_; ++buddy) {
131  const MasstreeIntermediatePage* page
132  = reinterpret_cast<const MasstreeIntermediatePage*>(args.root_info_pages_[buddy]);
133  if (page->get_btree_level() > base->get_btree_level()) {
134  LOG(INFO) << "MasstreeStorage-" << storage_id_ << " has an unbalanced depth in sub-tree";
136  }
137  }
138 
139  uint16_t updated_count = 0;
140  for (SlotIndex i = 0; i <= base->get_key_count(); ++i) {
141  MasstreeIntermediatePage::MiniPage& base_mini = base->get_minipage(i);
142  for (SlotIndex j = 0; j <= base_mini.key_count_; ++j) {
143  SnapshotPagePointer base_pointer = base_mini.pointers_[j].snapshot_pointer_;
144  if (from_this_snapshot(base_pointer, args)) {
145  ++updated_count;
146  continue; // base has updated it
147  }
148 
149  for (uint16_t buddy = 1; buddy < args.root_info_pages_count_; ++buddy) {
150  const MasstreeIntermediatePage* page
151  = reinterpret_cast<const MasstreeIntermediatePage*>(args.root_info_pages_[buddy]);
152  const MasstreeIntermediatePage::MiniPage& mini = page->get_minipage(i);
154  if (from_this_snapshot(pointer, args)) {
155  base_mini.pointers_[j].snapshot_pointer_ = pointer; // this buddy has updated it
156  ++updated_count;
157  break;
158  }
159  }
160  }
161  }
162  VLOG(0) << to_string() << " has " << updated_count << " new pointers from root.";
163  }
164 
165  // In either case, some pointer might be null if that partition had no log record
166  // and if this is an initial snapshot. Create a dummy page for them.
167  MasstreeIntermediatePage* merged = reinterpret_cast<MasstreeIntermediatePage*>(new_root);
168  ASSERT_ND(!merged->is_border());
169  uint16_t dummy_count = 0;
170  for (MasstreeIntermediatePointerIterator it(merged); it.is_valid(); it.next()) {
171  DualPagePointer& pointer = merged->get_minipage(it.index_).pointers_[it.index_mini_];
172  if (pointer.snapshot_pointer_ == 0) {
173  // this should happen only while an initial snapshot for this storage.
174  ASSERT_ND(storage_.get_control_block()->root_page_pointer_.snapshot_pointer_ == 0);
175  KeySlice low, high;
176  merged->extract_separators_snapshot(it.index_, it.index_mini_, &low, &high);
177  uint32_t offset = 1U + dummy_count; // +1 for new_root
178  SnapshotPagePointer page_id = args.snapshot_writer_->get_next_page_id() + offset;
179  MasstreeBorderPage* dummy = reinterpret_cast<MasstreeBorderPage*>(
180  args.snapshot_writer_->get_page_base() + offset);
181  dummy->initialize_snapshot_page(storage_id_, page_id, 0, low, high);
182  pointer.snapshot_pointer_ = page_id;
183  ++dummy_count;
184  }
185  }
186  VLOG(0) << to_string() << " has added " << dummy_count << " dummy pointers in root.";
187 
188  ASSERT_ND(new_root->get_header().snapshot_);
189  ASSERT_ND(new_root->get_header().storage_id_ == storage_id_);
191 
192  new_root->get_header().page_id_ = new_root_id;
193  *args.new_root_page_pointer_ = new_root_id;
194  WRAP_ERROR_CODE(args.snapshot_writer_->dump_pages(0, 1 + dummy_count));
195 
196  // AFTER writing out the root page, install the pointer to new root page
197  storage_.get_control_block()->root_page_pointer_.snapshot_pointer_ = new_root_id;
198  storage_.get_control_block()->meta_.root_snapshot_page_id_ = new_root_id;
199 
200  stop_watch.stop();
201  VLOG(0) << to_string() << " done in " << stop_watch.elapsed_ms() << "ms.";
202  return kRetOk;
203 }
204 
205 ErrorStack MasstreeComposer::check_buddies(const Composer::ConstructRootArguments& args) const {
206  const MasstreeIntermediatePage* base
207  = reinterpret_cast<const MasstreeIntermediatePage*>(args.root_info_pages_[0]);
208  const SlotIndex key_count = base->header().key_count_;
209  const uint32_t buddy_count = args.root_info_pages_count_;
210  // check key_count
211  for (uint16_t buddy = 1; buddy < buddy_count; ++buddy) {
212  const MasstreeIntermediatePage* page
213  = reinterpret_cast<const MasstreeIntermediatePage*>(args.root_info_pages_[buddy]);
214  if (page->header().key_count_ != key_count) {
215  ASSERT_ND(false);
216  return ERROR_STACK_MSG(kErrorCodeInternalError, "key count");
217  } else if (base->get_low_fence() != page->get_low_fence()) {
218  ASSERT_ND(false);
219  return ERROR_STACK_MSG(kErrorCodeInternalError, "low fence");
220  } else if (base->get_high_fence() != page->get_high_fence()) {
221  ASSERT_ND(false);
222  return ERROR_STACK_MSG(kErrorCodeInternalError, "high fence");
223  }
224  }
225 
226  // check inside minipages
227  for (SlotIndex i = 0; i <= key_count; ++i) {
228  const MasstreeIntermediatePage::MiniPage& base_mini = base->get_minipage(i);
229  SlotIndex mini_count = base_mini.key_count_;
230  // check separator
231  for (uint16_t buddy = 1; buddy < buddy_count; ++buddy) {
232  const MasstreeIntermediatePage* page
233  = reinterpret_cast<const MasstreeIntermediatePage*>(args.root_info_pages_[buddy]);
234  if (i < key_count) {
235  if (base->get_separator(i) != page->get_separator(i)) {
236  ASSERT_ND(false);
237  return ERROR_STACK_MSG(kErrorCodeInternalError, "separator");
238  }
239  }
240  }
241  // check individual separator and pointers
242  for (SlotIndex j = 0; j <= mini_count; ++j) {
243  ASSERT_ND(base_mini.pointers_[j].volatile_pointer_.is_null());
244  uint16_t updated_by = buddy_count; // only one should have updated each pointer
245  SnapshotPagePointer old_pointer = base_mini.pointers_[j].snapshot_pointer_;
246  if (from_this_snapshot(old_pointer, args)) {
247  updated_by = 0;
248  old_pointer = 0; // will use buddy-1's pointer below
249  }
250 
251  for (uint16_t buddy = 1; buddy < buddy_count; ++buddy) {
252  const MasstreeIntermediatePage* page
253  = reinterpret_cast<const MasstreeIntermediatePage*>(args.root_info_pages_[buddy]);
254  const MasstreeIntermediatePage::MiniPage& mini = page->get_minipage(i);
255  ASSERT_ND(mini.pointers_[j].volatile_pointer_.is_null());
256  if (j < mini_count) {
257  if (mini.separators_[j] != base_mini.separators_[j]) {
258  ASSERT_ND(false);
259  return ERROR_STACK_MSG(kErrorCodeInternalError, "individual separator");
260  }
261  }
262  SnapshotPagePointer pointer = mini.pointers_[j].snapshot_pointer_;
263  if (buddy == 1U && updated_by == 0) {
264  old_pointer = pointer;
265  }
266  if (from_this_snapshot(pointer, args)) {
267  if (updated_by != buddy_count) {
268  return ERROR_STACK_MSG(kErrorCodeInternalError, "multiple updaters");
269  }
270  updated_by = buddy;
271  } else if (old_pointer != pointer) {
272  return ERROR_STACK_MSG(kErrorCodeInternalError, "original pointer mismatch");
273  }
274  }
275  }
276  }
277 
278  return kRetOk;
279 }
280 
281 
282 std::string MasstreeComposer::to_string() const {
283  return std::string("MasstreeComposer-") + std::to_string(storage_id_);
284 }
285 
292  Engine* engine,
293  snapshot::MergeSort* merge_sort,
294  const Composer::ComposeArguments& args)
295  : engine_(engine),
296  merge_sort_(merge_sort),
297  id_(merge_sort->get_storage_id()),
298  storage_(engine, id_),
299  args_(args),
300  snapshot_id_(args.snapshot_writer_->get_snapshot_id()),
301  numa_node_(get_writer()->get_numa_node()),
302  max_pages_(get_writer()->get_page_size()),
303  root_(reinterpret_cast<MasstreeIntermediatePage*>(args.root_info_page_)),
304  page_base_(reinterpret_cast<Page*>(get_writer()->get_page_base())),
305  original_base_(merge_sort->get_original_pages()) {
306  page_id_base_ = get_writer()->get_next_page_id();
307  refresh_page_boundary_info_variables();
308  root_index_ = 0;
309  root_index_mini_ = 0;
310  allocated_pages_ = 0;
311  cur_path_levels_ = 0;
312  page_boundary_info_cur_pos_ = 0;
313  page_boundary_elements_ = 0;
314  std::memset(cur_path_, 0, sizeof(cur_path_));
315  std::memset(cur_prefix_be_, 0, sizeof(cur_prefix_be_));
316  std::memset(cur_prefix_slices_, 0, sizeof(cur_prefix_slices_));
317  std::memset(tmp_boundary_array_, 0, sizeof(tmp_boundary_array_));
318 }
319 
320 void MasstreeComposeContext::refresh_page_boundary_info_variables() {
321  // we use intermediate pool of snapshot writer for page_boundary_info_/sort_
322  uint32_t size_in_pages = get_writer()->get_intermediate_size();
323  uint64_t size_in_bytes = size_in_pages * sizeof(Page);
324  // page_boundary_info_ consumes many more bytes than page_boundary_sort_ per entry.
325  // PageBoundaryInfo: (layer+3) * 8 bytes. So, at least 24 bytes. Likely 40-60 bytes or more.
326  // PageBoundarySort: Always 8 bytes.
327  // Let's be conservative. 15:1 distribution between the two.
328  max_page_boundary_elements_ = size_in_bytes / (16ULL * 8ULL);
329  page_boundary_info_capacity_ = max_page_boundary_elements_ * 15ULL;
330  page_boundary_info_ = reinterpret_cast<char*>(get_writer()->get_intermediate_base());
331  page_boundary_sort_ = reinterpret_cast<PageBoundarySort*>(
332  page_boundary_info_ + page_boundary_info_capacity_ * 8ULL);
333 }
334 
335 void MasstreeComposeContext::write_dummy_page_zero() {
336  ASSERT_ND(allocated_pages_ == 0);
337  Page* dummy_page = page_base_;
338  std::memset(dummy_page, 0xDA, sizeof(Page));
339  dummy_page->get_header().page_id_ = get_writer()->get_next_page_id();
340  dummy_page->get_header().storage_id_ = id_;
342  dummy_page->get_header().snapshot_ = true;
343  allocated_pages_ = 1;
344 }
345 
346 inline MasstreePage* MasstreeComposeContext::get_page(memory::PagePoolOffset offset) const {
347  ASSERT_ND(offset > 0);
348  ASSERT_ND(offset < allocated_pages_);
349  ASSERT_ND(offset < max_pages_);
350  return reinterpret_cast<MasstreePage*>(page_base_ + offset);
351 }
352 inline MasstreeIntermediatePage* MasstreeComposeContext::as_intermdiate(MasstreePage* page) const {
353  ASSERT_ND(!page->is_border());
354  return reinterpret_cast<MasstreeIntermediatePage*>(page);
355 }
356 inline MasstreeBorderPage* MasstreeComposeContext::as_border(MasstreePage* page) const {
357  ASSERT_ND(page->is_border());
358  return reinterpret_cast<MasstreeBorderPage*>(page);
359 }
360 inline MasstreePage* MasstreeComposeContext::get_original(memory::PagePoolOffset offset) const {
361  ASSERT_ND(offset < kMaxLevels);
362  ASSERT_ND(offset < cur_path_levels_);
363  return reinterpret_cast<MasstreePage*>(original_base_ + offset);
364 }
365 
372  write_dummy_page_zero();
373  CHECK_ERROR(init_root());
374 
375 #ifndef NDEBUG
376  char debug_prev[8192];
377  uint16_t debug_prev_length = 0;
378 #endif // NDEBUG
379 
380  uint64_t processed = 0;
381  while (true) {
382  CHECK_ERROR(merge_sort_->next_batch());
383  uint64_t count = merge_sort_->get_current_count();
384  if (count == 0 && merge_sort_->is_ended_all()) {
385  break;
386  }
387 
388 #ifndef NDEBUG
389  // confirm that it's fully sorted
390  for (uint64_t i = 0; i < count; ++i) {
391  const MasstreeCommonLogType* entry =
392  reinterpret_cast<const MasstreeCommonLogType*>(merge_sort_->resolve_sort_position(i));
393  const char* key = entry->get_key();
394  KeyLength key_length = entry->key_length_;
395  ASSERT_ND(key_length > 0);
396  if (key_length < debug_prev_length) {
397  ASSERT_ND(std::memcmp(debug_prev, key, key_length) <= 0);
398  } else {
399  ASSERT_ND(std::memcmp(debug_prev, key, debug_prev_length) <= 0);
400  }
401  std::memcpy(debug_prev, key, key_length);
402  debug_prev_length = key_length;
403  }
404 #endif // NDEBUG
405 
406  // within the batch, we further split them into "groups" that share the same key or log types
407  // so that we can process a number of logs in a tight loop.
408  uint64_t cur = 0;
409  uint32_t group_count = 0;
410  while (cur < count) {
413  if (LIKELY(group.count_ == 1U)) { // misprediction is amortized by group, so LIKELY is better
414  // no grouping. slowest case. has to be reasonably fast even in this case.
415  ASSERT_ND(!group.has_common_key_);
417  CHECK_ERROR(execute_a_log(cur));
418  } else if (group.has_common_key_) {
419  CHECK_ERROR(execute_same_key_group(cur, cur + group.count_));
420  } else {
422  log::LogCode log_type = group.get_log_type();
423  if (log_type == log::kLogCodeMasstreeInsert) {
424  CHECK_ERROR(execute_insert_group(cur, cur + group.count_));
425  } else if (log_type == log::kLogCodeMasstreeDelete) {
426  CHECK_ERROR(execute_delete_group(cur, cur + group.count_));
427  } else if (log_type == log::kLogCodeMasstreeUpdate) {
428  CHECK_ERROR(execute_update_group(cur, cur + group.count_));
429  } else {
431  CHECK_ERROR(execute_overwrite_group(cur, cur + group.count_));
432  }
433  }
434  cur += group.count_;
435  processed += group.count_;
436  ++group_count;
437  }
438  ASSERT_ND(cur == count);
439  VLOG(1) << storage_ << " processed a batch of " << cur << " logs that has " << group_count
440  << " groups";
441  }
442 
443  VLOG(0) << storage_ << " processed " << processed << " logs. now pages=" << allocated_pages_;
444  CHECK_ERROR(flush_buffer());
445 
446  // at the end, install pointers to the created snapshot pages except root page.
447  uint64_t installed_count = 0;
448  CHECK_ERROR(install_snapshot_pointers(&installed_count));
449  return kRetOk;
450 }
451 
452 ErrorStack MasstreeComposeContext::execute_same_key_group(uint32_t from, uint32_t to) {
453  if (from == to) {
454  return kRetOk;
455  }
456  // As these logs are on the same key, we check which logs can be nullified.
457 
458  // Let's say I:Insert, U:Update, D:Delete, O:Overwrite
459  // overwrite: this is the easiest one that is nullified by following delete/update.
460  // insert: if there is following delete, everything in-between disappear, including insert/delete.
461  // update: nullified by following delete/update
462  // delete: strongest. never nullified except the insert-delete pairing.
463 
464  // Merge rule 1: delete+insert in a row can be converted into one update
465  // because update is delete+insert. Here, we ignore delete log and change insert log to update.
466  // Merge rule 2: insert+update in a row can be converted into one insert
467 
468  // As a consequence, we can compact every sequence of logs for the same key into:
469  // - Special case. One delete, nothing else. The old page contains a record of the key.
470  // - Case A. The old page contains a record of the key:
471  // Zero or one update, followed by zero or more overwrites ("[U]?O*").
472  // - Case B. The old page does not contain a record of the key:
473  // One insert, followed by zero or one update, followed by zero or more overwrites ("I[U]?O*").
474 
475  // Conversion examples..
476  // Case A1: OODIOUO => DIOUO (nullified everything before D)
477  // => UOUO (DI -> U conversion)
478  // => UUO (nullified O before U)
479  // => UO (nullified U before U)
480  // Case A2: OODIOUOD => D (nullified everything before D)
481  // Case B1: IUUDIOU => IOU (I and D cancel each other)
482  // => IU (nullified O before U)
483  // Case B2: IOUODID => ID (I and D cancel each other)
484  // => <empty> (I and D cancel each other)
485 
486 
487  // Iff it starts with an insert, the old page doesn't have a record of the key.
488  const bool starts_with_insert
490 
491  // Index of non-nullified delete. if this is != to, it's the special case above.
492  // Thus, this value must be "to - 1" in that case.
493  uint32_t last_active_delete = to;
494  // First, look for the last delete. It determines many things if exists.
495  ASSERT_ND(to > 0);
496  for (uint32_t i = static_cast<uint32_t>(to - 1);
497  i >= from && i < static_cast<uint32_t>(to); // the latter one for uint wrap-around
498  --i) {
499  log::LogCode log_type = merge_sort_->get_log_type_from_sort_position(i);
500  if (log_type == log::kLogCodeMasstreeDelete) {
501  ASSERT_ND(last_active_delete == to);
502  last_active_delete = i;
503  // Okay, found a delete. Everything before this will be nullified.
504 
505 #ifndef NDEBUG
506  // Let's do sanity check. Is it consistent with the number of preceding inserts/deletes?
507  uint32_t insert_count = 0;
508  uint32_t delete_count = 0;
509  for (uint32_t j = from; j < i; ++j) {
510  log::LogCode log_type_j = merge_sort_->get_log_type_from_sort_position(j);
511  switch (log_type_j) {
513  ASSERT_ND((starts_with_insert && insert_count == delete_count)
514  || (!starts_with_insert && insert_count + 1U == delete_count));
515  ++insert_count;
516  break;
518  ASSERT_ND((!starts_with_insert && insert_count == delete_count)
519  || (starts_with_insert && insert_count == delete_count + 1U));
520  ++delete_count;
521  break;
522  default:
524  || log_type_j == log::kLogCodeMasstreeOverwrite);
525  ASSERT_ND((!starts_with_insert && insert_count == delete_count)
526  || (starts_with_insert && insert_count == delete_count + 1U));
527  break;
528  }
529  }
530 
531  // it ends with a delete, so there must be something to delete.
532  ASSERT_ND((!starts_with_insert && insert_count == delete_count)
533  || (starts_with_insert && insert_count == delete_count + 1U));
534 #endif // NDEBUG
535 
536  break;
537  }
538  }
539 
540  // Index of non-nullified insert. if this is != to, it's always case B and all other active logs
541  // are after this.
542  uint32_t last_active_insert = to;
543  // Whether the last active insert is a "merged" insert, meaning an update
544  // following an insert. In this case, we treat the update-log as if it's an insert log.
545  bool is_last_active_insert_merged = false;
546 
547  // Index of non-nullified update. if this is != to, all active overwrite logs are after this.
548  uint32_t last_active_update = to;
549  // Whether the last active update is a "merged" update, meaning an insert
550  // following a delete. In this case, we treat the insert-log as if it's an update log.
551  bool is_last_active_update_merged = false;
552 
553  uint32_t next_to_check = from;
554  if (last_active_delete != to) {
555  ASSERT_ND(last_active_delete >= from && last_active_delete < to);
556  if (last_active_delete + 1U == to) {
557  // This delete concludes the series of logs. Great!
558  if (starts_with_insert) {
559  DVLOG(1) << "yay, same-key-group completely eliminated a group of "
560  << (to - from) << " logs";
561  // TASK(Hideaki) assertion to check if the old snapshot doesn't contain this key.
562  } else {
563  DVLOG(1) << "yay, same-key-group compressed " << (to - from) << " logs into a delete";
564  CHECK_ERROR(execute_a_log(last_active_delete));
565  }
566 
567  return kRetOk;
568  } else {
569  // The only possible log right after delete is insert.
570  uint32_t next = last_active_delete + 1U;
571  log::LogCode next_type = merge_sort_->get_log_type_from_sort_position(next);
573 
574  if (starts_with_insert) {
575  // I...DI,,,
576  // We are sure the ... has the same number of I/D.
577  // So, we just cancel them each other, turning into I,,,
578  DVLOG(2) << "I and D cancelled each other. from=" << from
579  << " delete at=" << last_active_delete;
580  last_active_insert = next;
581  } else {
582  // ...DI,,,
583  // In ..., #-of-I == #-of-D
584  // Now the merge rule 1 applies. D+I => U, turning into U,,,
585  DVLOG(2) << "D+I merged into U. from=" << from << " delete at=" << last_active_delete;
586  last_active_update = next;
587  is_last_active_update_merged = true;
588  }
589  next_to_check = next + 1U;
590  last_active_delete = to;
591  }
592  }
593 
594  // From now on, we are sure there is no more delete or insert.
595  // We just look for updates, which can nullify or merge into other operations.
596  ASSERT_ND(next_to_check >= from);
597  ASSERT_ND(last_active_delete == to);
598  for (uint32_t i = next_to_check; i < to; ++i) {
599  log::LogCode log_type = merge_sort_->get_log_type_from_sort_position(i);
602  if (log_type == log::kLogCodeMasstreeUpdate) {
603  // Update nullifies the preceding overwrite, and also merges with the preceding insert.
604  if (last_active_insert != to) {
605  DVLOG(2) << "U replaced all preceding I and O, turning into a new I. i=" << i;
606  ASSERT_ND(last_active_update == to); // then it should have already merged
607  ASSERT_ND(!is_last_active_update_merged);
608  last_active_insert = i;
609  is_last_active_insert_merged = true;
610  } else {
611  DVLOG(2) << "U nullified all preceding O and U. i=" << i;
612  last_active_update = i;
613  is_last_active_update_merged = false;
614  }
615  } else {
616  // Overwrites are just skipped.
618  ASSERT_ND(starts_with_insert || last_active_insert != to);
619  }
620  }
621 
622  // After these conversions, the only remaining log patterns are:
623  // a) "IO*" : the data page doesn't have the key.
624  // b) "U?O*" : the data page already has the key.
625  ASSERT_ND(last_active_delete == to);
626  uint32_t cur = from;
627  if (last_active_insert != to || last_active_update != to) {
628  // Process I/U separately. The I/U might be a merged one.
629  log::LogCode type_before_merge;
630  log::LogCode type_after_merge;
631  if (last_active_insert != to) {
632  ASSERT_ND(last_active_update == to);
633  ASSERT_ND(!is_last_active_update_merged);
634  type_after_merge = log::kLogCodeMasstreeInsert;
635  cur = last_active_insert;
636  if (is_last_active_insert_merged) {
637  type_before_merge = log::kLogCodeMasstreeUpdate;
638  } else {
639  type_before_merge = log::kLogCodeMasstreeInsert;
640  }
641  } else {
642  ASSERT_ND(!is_last_active_insert_merged);
643  type_after_merge = log::kLogCodeMasstreeUpdate;
644  cur = last_active_update;
645  if (is_last_active_update_merged) {
646  type_before_merge = log::kLogCodeMasstreeInsert;
647  } else {
648  type_before_merge = log::kLogCodeMasstreeUpdate;
649  }
650  }
651 
652  ASSERT_ND(type_before_merge == merge_sort_->get_log_type_from_sort_position(cur));
653  if (type_after_merge != type_before_merge) {
654  DVLOG(2) << "Disguising log type for merge. cur=" << cur
655  << ", type_before_merge=" << type_before_merge << ", type_after_merge=" << type_after_merge;
656  // Change log type. Insert and Update have exactly same log layout, so it's safe.
657  merge_sort_->change_log_type_at(cur, type_before_merge, type_after_merge);
658  }
659 
660  // Process the I/U as usual. This also makes sure that the tail-record is the key.
661  } else {
663  // All logs are overwrites.
664  // Even in this case, we must process the first log as usual so that
665  // the tail-record in the tail page points to the record.
666  }
667  ASSERT_ND(cur < to);
668  CHECK_ERROR(execute_a_log(cur));
669  ++cur;
670  if (cur == to) {
671  return kRetOk;
672  }
673 
674  // All the followings are overwrites.
675  // Process the remaining overwrites in a tight loop.
676  // We made sure sure the tail-record in the tail page points to the record.
677  PathLevel* last = get_last_level();
678  ASSERT_ND(get_page(last->tail_)->is_border());
679  MasstreeBorderPage* page = as_border(get_page(last->tail_));
680  SlotIndex key_count = page->get_key_count();
681  ASSERT_ND(key_count > 0);
682  SlotIndex index = key_count - 1;
683  ASSERT_ND(!page->does_point_to_layer(index));
684  char* record = page->get_record(index);
685 
686  for (uint32_t i = cur; i < to; ++i) {
687  const MasstreeOverwriteLogType* casted =
688  reinterpret_cast<const MasstreeOverwriteLogType*>(merge_sort_->resolve_sort_position(i));
689  ASSERT_ND(casted->header_.get_type() == log::kLogCodeMasstreeOverwrite);
690  ASSERT_ND(page->equal_key(index, casted->get_key(), casted->key_length_));
691 
692  // Also, we look for a chance to ignore redundant overwrites.
693  // If next overwrite log covers the same or more data range, we can skip the log.
694  // Ideally, we should have removed such logs back in mappers.
695  if (i < to) {
696  const MasstreeOverwriteLogType* next =
697  reinterpret_cast<const MasstreeOverwriteLogType*>(
698  merge_sort_->resolve_sort_position(i + 1U));
699  if ((next->payload_offset_ <= casted->payload_offset_)
700  && (next->payload_offset_ + next->payload_count_
701  >= casted->payload_offset_ + casted->payload_count_)) {
702  DVLOG(3) << "Skipped redundant overwrites";
703  continue;
704  }
705  }
706 
707  casted->apply_record(nullptr, id_, page->get_owner_id(index), record);
708  }
709  return kRetOk;
710 }
711 
712 ErrorStack MasstreeComposeContext::execute_insert_group(uint32_t from, uint32_t to) {
713  uint32_t cur = from;
714  while (cur < to) {
715  CHECK_ERROR(flush_if_nearly_full()); // so that the append loop doesn't have to check
716 
717  // let's see if the remaining logs are smaller than existing records or the page boundary.
718  // if so, we can simply create new pages for a (hopefully large) number of logs.
719  // in the worst case, it's like a marble (newrec, oldrec, newrec, oldrec, ...) in a page
720  // or even worse a newrec in a page, a newrec in another page, ... If that is the case,
721  // we insert as usual. but, in many cases a group of inserts create a new region.
722  KeySlice min_slice;
723  KeySlice max_slice;
724  uint32_t run_count = execute_insert_group_get_cur_run(cur, to, &min_slice, &max_slice);
725  if (run_count <= 1U) {
727  CHECK_ERROR(execute_a_log(cur));
728  ++cur;
729  continue;
730  }
731 
732  DVLOG(2) << "Great, " << run_count << " insert-logs to process in the current context.";
733 
734  PathLevel* last = get_last_level();
735  ASSERT_ND(get_page(last->tail_)->is_border());
736  MasstreeBorderPage* page = as_border(get_page(last->tail_));
737  // now these logs are guaranteed to be:
738  // 1) fence-wise fit in the current B-tree page (of course might need page splits).
739  // 2) can be stored in the current B-trie layer.
740  // 3) no overlap with existing records.
741  // thus, we just create a sequence of new B-tree pages next to the current page.
742 
743  // first, let's get hints about new page boundaries to align well with volatile pages.
744  ASSERT_ND(min_slice >= page->get_low_fence());
745  ASSERT_ND(max_slice <= page->get_high_fence());
746  uint32_t found_boundaries_count = 0;
747  MasstreeStorage::PeekBoundariesArguments args = {
748  cur_prefix_slices_,
749  last->layer_,
751  min_slice,
752  max_slice,
753  tmp_boundary_array_,
754  &found_boundaries_count };
755  WRAP_ERROR_CODE(storage_.peek_volatile_page_boundaries(engine_, args));
756  ASSERT_ND(found_boundaries_count <= kTmpBoundaryArraySize);
757  DVLOG(2) << found_boundaries_count << " boundary hints for " << run_count << " insert-logs.";
758  if (found_boundaries_count >= kTmpBoundaryArraySize) {
759  LOG(INFO) << "holy crap, we get more than " << kTmpBoundaryArraySize << " page boundaries"
760  << " as hints for " << run_count << " insert-logs!!";
761  // as this is just a hint, we can go on. if this often happens, reduce kMaxLogGroupSize.
762  // or increase kTmpBoundaryArraySize.
763  }
764 
765  // then, append all of them in a tight loop
766  WRAP_ERROR_CODE(execute_insert_group_append_loop(cur, cur + run_count, found_boundaries_count));
767  cur += run_count;
768  }
769 
770  ASSERT_ND(cur == to);
771  return kRetOk;
772 }
773 
774 ErrorCode MasstreeComposeContext::execute_insert_group_append_loop(
775  uint32_t from,
776  uint32_t to,
777  uint32_t hint_count) {
778  const uint16_t kFetch = 8; // parallel fetch to amortize L1 cachemiss cost.
779  const KeySlice* hints = tmp_boundary_array_;
780  const log::RecordLogType* logs[kFetch];
781  uint32_t cur = from;
782  uint32_t next_hint = 0;
783  uint32_t dubious_hints = 0; // # of previous hints that resulted in too-empty pages
784  PathLevel* last = get_last_level();
785  ASSERT_ND(get_page(last->tail_)->is_border());
786  MasstreeBorderPage* page = as_border(get_page(last->tail_));
787  uint8_t cur_layer = page->get_layer();
788 
789  while (cur < to) {
790  uint16_t fetch_count = std::min<uint32_t>(kFetch, to - cur);
791  uint16_t actual_count = merge_sort_->fetch_logs(cur, fetch_count, logs);
792  ASSERT_ND(actual_count == fetch_count);
793 
794  for (uint32_t i = 0; i < actual_count; ++i) {
795  // hints[next_hint] tells when we should close the current page. check if we overlooked it.
796  // whether we followed the hints or not, we advance next_hint, so the followings always hold.
797  ASSERT_ND(next_hint >= hint_count || page->get_low_fence() < hints[next_hint]);
798  ASSERT_ND(next_hint >= hint_count
799  || page->get_key_count() == 0
800  || page->get_slice(page->get_key_count() - 1) < hints[next_hint]);
801 
802  const MasstreeInsertLogType* entry
803  = reinterpret_cast<const MasstreeInsertLogType*>(ASSUME_ALIGNED(logs[i], 8));
804  ASSERT_ND(entry->header_.get_type() == log::kLogCodeMasstreeInsert);
805  const char* key = entry->get_key();
806  KeyLength key_length = entry->key_length_;
807 
808 #ifndef NDEBUG
809  ASSERT_ND(key_length > cur_layer * sizeof(KeySlice));
810  for (uint8_t layer = 0; layer < cur_layer; ++layer) {
811  KeySlice prefix_slice = normalize_be_bytes_full_aligned(key + layer * sizeof(KeySlice));
812  ASSERT_ND(prefix_slice == cur_prefix_slices_[layer]);
813  }
814 #endif // NDEBUG
815 
816  KeyLength remainder_length = key_length - cur_layer * sizeof(KeySlice);
817  const char* suffix = key + (cur_layer + 1) * sizeof(KeySlice);
818  KeySlice slice = normalize_be_bytes_full_aligned(key + cur_layer * sizeof(KeySlice));
819  ASSERT_ND(page->within_fences(slice));
820  SlotIndex key_count = page->get_key_count();
821  ASSERT_ND(key_count == 0 || slice > page->get_slice(key_count - 1));
822  PayloadLength payload_count = entry->payload_count_;
823  const char* payload = entry->get_payload();
824  xct::XctId xct_id = entry->header_.xct_id_;
825  ASSERT_ND(!merge_sort_->get_base_epoch().is_valid()
826  || xct_id.get_epoch() >= merge_sort_->get_base_epoch());
827 
828  // If the hint says the new record should be in a new page, we close the current page
829  // EXCEPT when we are getting too many cases that result in too-empty pages.
830  // It can happen when the volatile world is way ahead of the snapshotted epochs.
831  // We still have to tolerate some too-empty pages, otherwise, :
832  // hint tells: 0-10, 10-20, 20-30, ...
833  // the first page for some reason became full at 0-8. when processing 10, the new page is
834  // still almost empty, we ignore the hint. close at 10-18. same thing happens at 20, ...
835  // Hence, we ignore hints if kDubiousHintsThreshold previous hints caused too empty pages.
836  const uint32_t kDubiousHintsThreshold = 2; // to tolerate more, increase this.
837  const DataOffset kTooEmptySize = kBorderPageDataPartSize * 2 / 10;
838  bool page_switch_hinted = false;
839  KeySlice hint_suggested_slice = kInfimumSlice;
840  if (UNLIKELY(next_hint < hint_count && slice >= hints[next_hint])) {
841  DVLOG(3) << "the hint tells that we should close the page now.";
842  bool too_empty = key_count == 0 || page->available_space() <= kTooEmptySize;
843  if (too_empty) {
844  DVLOG(1) << "however, the page is still quite empty!";
845  if (dubious_hints >= kDubiousHintsThreshold) {
846  DVLOG(1) << "um, let's ignore the hints.";
847  } else {
848  DVLOG(1) << "neverthelss, let's trust the hints for now.";
849  ++dubious_hints;
850  page_switch_hinted = true;
851  hint_suggested_slice = hints[next_hint];
852  }
853  }
854  // consume all hints up to this slice.
855  ++next_hint;
856  while (next_hint < hint_count && slice >= hints[next_hint]) {
857  ++next_hint;
858  }
859  }
860 
861  if (UNLIKELY(page_switch_hinted)
862  || UNLIKELY(!page->can_accomodate_snapshot(remainder_length, payload_count))) {
863  // unlike append_border_newpage(), which is used in the per-log method, this does no
864  // page migration. much simpler and faster.
865  memory::PagePoolOffset new_offset = allocate_page();
866  SnapshotPagePointer new_page_id = page_id_base_ + new_offset;
867  MasstreeBorderPage* new_page = reinterpret_cast<MasstreeBorderPage*>(get_page(new_offset));
868 
869  KeySlice middle;
870  if (page_switch_hinted) {
871  ASSERT_ND(hint_suggested_slice > page->get_low_fence());
872  ASSERT_ND(hint_suggested_slice <= slice);
873  ASSERT_ND(hint_suggested_slice < page->get_high_fence());
874  middle = hint_suggested_slice;
875  } else {
876  middle = slice;
877  }
878 
879  KeySlice high_fence = page->get_high_fence();
880  new_page->initialize_snapshot_page(id_, new_page_id, cur_layer, middle, high_fence);
881  page->set_foster_major_offset_unsafe(new_offset); // set next link
882  page->set_high_fence_unsafe(middle);
883  last->tail_ = new_offset;
884  ++last->page_count_;
885  page = new_page;
886  key_count = 0;
887  }
888 
889  ASSERT_ND(page->can_accomodate_snapshot(remainder_length, payload_count));
890  page->reserve_record_space(key_count, xct_id, slice, suffix, remainder_length, payload_count);
891  page->increment_key_count();
892  fill_payload_padded(page->get_record_payload(key_count), payload, payload_count);
893  }
894 
895  cur += actual_count;
896  }
897  ASSERT_ND(cur == to);
898  return kErrorCodeOk;
899 }
900 
901 uint32_t MasstreeComposeContext::execute_insert_group_get_cur_run(
902  uint32_t cur,
903  uint32_t to,
904  KeySlice* min_slice,
905  KeySlice* max_slice) {
906  ASSERT_ND(cur < to);
908  *min_slice = kInfimumSlice;
909  *max_slice = kInfimumSlice;
910  if (cur_path_levels_ == 0) {
911  // initial call. even get_last_level() doesn't work. process it as usual
912  return 0;
913  }
914 
915  PathLevel* last = get_last_level();
916  MasstreePage* tail_page = get_page(last->tail_);
917  if (!tail_page->is_border()) {
918  LOG(WARNING) << "um, why this can happen? anyway, let's process it per log";
919  return 0;
920  }
921 
922  const MasstreeBorderPage* page = as_border(tail_page);
923  const KeySlice low_fence = page->get_low_fence();
924  const KeySlice high_fence = page->get_high_fence();
925  const SlotIndex existing_keys = page->get_key_count();
926  const KeySlice existing_slice
927  = existing_keys == 0 ? kInfimumSlice : page->get_slice(existing_keys - 1);
928  ASSERT_ND(last->layer_ == page->get_layer());
929  const uint8_t prefix_count = last->layer_;
930  const snapshot::MergeSort::SortEntry* sort_entries = merge_sort_->get_sort_entries();
931  uint32_t run_count;
932  KeySlice prev_slice = existing_slice;
933  if (prefix_count == 0) {
934  // optimized impl for this common case. sort_entries provides enough information.
935  for (run_count = 0; cur + run_count < to; ++run_count) {
936  KeySlice slice = sort_entries[cur + run_count].get_key(); // sole key of interest.
937  ASSERT_ND(slice >= low_fence); // because logs are sorted
938  if (slice >= high_fence) { // page boundary
939  return run_count;
940  }
941  if (slice == prev_slice) { // next-layer
942  // if both happen to be 0 (infimum) or key_length is not a multiply of 8, this check is
943  // conservative. but, it's rare, so okay to process them per log.
944  return run_count;
945  }
946  if (last->has_next_original() && slice >= last->next_original_slice_) { // original record
947  // this is a conservative break because we don't check the key length and it's ">=".
948  // it's fine, if that happens, we just process them per log.
949  return run_count;
950  }
951 
952  prev_slice = slice;
953  if (run_count == 0) {
954  *min_slice = slice;
955  } else {
956  ASSERT_ND(slice > *min_slice);
957  }
958  *max_slice = slice;
959  }
960  } else {
961  // then, a bit more expensive. we might have to grab the log itself, which causes L1 cachemiss.
962  for (run_count = 0; cur + run_count < to; ++run_count) {
963  // a low-cost check first. match the first prefix layer
964  KeySlice first_slice = sort_entries[cur + run_count].get_key();
965  ASSERT_ND(first_slice >= cur_prefix_slices_[0]); // because logs are sorted
966  if (first_slice != cur_prefix_slices_[0]) {
967  return run_count;
968  }
969 
970  // ah, we have to grab the log.
971  const MasstreeCommonLogType* entry =
972  reinterpret_cast<const MasstreeCommonLogType*>(
973  merge_sort_->resolve_sort_position(cur + run_count));
974  const char* key = entry->get_key();
975  KeyLength key_length = entry->key_length_;
976  ASSERT_ND(is_key_aligned_and_zero_padded(key, key_length));
977  if (key_length <= prefix_count * sizeof(KeySlice)) {
978  return run_count;
979  }
980 
981  // match remaining prefix layers
982  for (uint8_t layer = 1; layer < prefix_count; ++layer) {
983  KeySlice prefix_slice = normalize_be_bytes_full_aligned(key + layer * sizeof(KeySlice));
984  ASSERT_ND(prefix_slice >= cur_prefix_slices_[layer]); // because logs are sorted
985  if (prefix_slice != cur_prefix_slices_[layer]) {
986  return run_count;
987  }
988  }
989 
990  // then same checks
991  KeySlice slice = normalize_be_bytes_full_aligned(key + prefix_count * sizeof(KeySlice));
992  ASSERT_ND(slice >= low_fence); // because logs are sorted
993  if (slice >= high_fence
994  || slice == prev_slice
995  || (last->has_next_original() && slice >= last->next_original_slice_)) {
996  return run_count;
997  }
998 
999  prev_slice = slice;
1000  if (run_count == 0) {
1001  *min_slice = slice;
1002  } else {
1003  ASSERT_ND(slice > *min_slice);
1004  }
1005  *max_slice = slice;
1006  }
1007  }
1008  ASSERT_ND(cur + run_count == to); // because above code returns rather than breaks.
1009  return run_count;
1010 }
1011 
1012 ErrorStack MasstreeComposeContext::execute_delete_group(uint32_t from, uint32_t to) {
1013  // TASK(Hideaki) batched impl. but this case is not common, so later, later..
1014  for (uint32_t i = from; i < to; ++i) {
1015  CHECK_ERROR(execute_a_log(i));
1016  }
1017  return kRetOk;
1018 }
1019 
1020 ErrorStack MasstreeComposeContext::execute_update_group(uint32_t from, uint32_t to) {
1021  // TASK(Hideaki) batched impl. but this case is not common, so later, later..
1022  for (uint32_t i = from; i < to; ++i) {
1023  CHECK_ERROR(execute_a_log(i));
1024  }
1025  return kRetOk;
1026 }
1027 
1028 ErrorStack MasstreeComposeContext::execute_overwrite_group(uint32_t from, uint32_t to) {
1029  // TASK(Hideaki) batched impl
1030  for (uint32_t i = from; i < to; ++i) {
1031  CHECK_ERROR(execute_a_log(i));
1032  }
1033  return kRetOk;
1034 }
1035 
1036 inline ErrorStack MasstreeComposeContext::execute_a_log(uint32_t cur) {
1037  ASSERT_ND(cur < merge_sort_->get_current_count());
1038 
1039  const MasstreeCommonLogType* entry =
1040  reinterpret_cast<const MasstreeCommonLogType*>(merge_sort_->resolve_sort_position(cur));
1041  const char* key = entry->get_key();
1042  KeyLength key_length = entry->key_length_;
1043  ASSERT_ND(key_length > 0);
1044  ASSERT_ND(cur_path_levels_ == 0 || get_page(get_last_level()->tail_)->is_border());
1045  ASSERT_ND(is_key_aligned_and_zero_padded(key, key_length));
1046 
1047  if (UNLIKELY(does_need_to_adjust_path(key, key_length))) {
1048  CHECK_ERROR(adjust_path(key, key_length));
1049  }
1050 
1051  PathLevel* last = get_last_level();
1052  ASSERT_ND(get_page(last->tail_)->is_border());
1053  KeySlice slice = normalize_be_bytes_full_aligned(key + last->layer_ * kSliceLen);
1054  ASSERT_ND(last->contains_slice(slice));
1055 
1056  // we might have to consume original records before processing this log.
1057  if (last->needs_to_consume_original(slice, key_length)) {
1058  CHECK_ERROR(consume_original_upto_border(slice, key_length, last));
1059  ASSERT_ND(!last->needs_to_consume_original(slice, key_length));
1060  }
1061 
1062  MasstreeBorderPage* page = as_border(get_page(last->tail_));
1063  SlotIndex key_count = page->get_key_count();
1064 
1065  // we might have to follow next-layer pointers. Check it.
1066  // notice it's "while", not "if", because we might follow more than one next-layer pointer
1067  while (key_count > 0
1068  && key_length > (last->layer_ + 1U) * kSliceLen
1069  && page->get_slice(key_count - 1) == slice
1070  && page->get_remainder_length(key_count - 1) > kSliceLen) {
1071  // then we have to either go next layer.
1072  if (page->does_point_to_layer(key_count - 1)) {
1073  // the next layer already exists. just follow it.
1074  ASSERT_ND(page->get_next_layer(key_count - 1)->volatile_pointer_.is_null());
1075  SnapshotPagePointer* next_pointer = &page->get_next_layer(key_count - 1)->snapshot_pointer_;
1076  CHECK_ERROR(open_next_level(key, key_length, page, slice, next_pointer));
1077  } else {
1078  // then we have to newly create a layer.
1079  ASSERT_ND(page->will_conflict(key_count - 1, slice, key_length - last->layer_ * kSliceLen));
1080  open_next_level_create_layer(key, key_length, page, slice, key_count - 1);
1081  ASSERT_ND(page->does_point_to_layer(key_count - 1));
1082  }
1083 
1084  // in either case, we went deeper. retrieve these again
1085  last = get_last_level();
1086  ASSERT_ND(get_page(last->tail_)->is_border());
1087  slice = normalize_be_bytes_full_aligned(key + last->layer_ * kSliceLen);
1088  ASSERT_ND(last->contains_slice(slice));
1089  page = as_border(get_page(last->tail_));
1090  key_count = page->get_key_count();
1091  }
1092 
1093  // Now we are sure the tail of the last level is the only relevant record. process the log.
1094  if (entry->header_.get_type() == log::kLogCodeMasstreeOverwrite) {
1095  // [Overwrite] simply reuse log.apply
1096  SlotIndex index = key_count - 1;
1097  ASSERT_ND(!page->does_point_to_layer(index));
1098  ASSERT_ND(page->equal_key(index, key, key_length));
1099  char* record = page->get_record(index);
1100  const MasstreeOverwriteLogType* casted
1101  = reinterpret_cast<const MasstreeOverwriteLogType*>(entry);
1102  casted->apply_record(nullptr, id_, page->get_owner_id(index), record);
1103  } else {
1104  // DELETE/INSERT/UPDATE
1105  ASSERT_ND(
1106  entry->header_.get_type() == log::kLogCodeMasstreeDelete
1107  || entry->header_.get_type() == log::kLogCodeMasstreeInsert
1108  || entry->header_.get_type() == log::kLogCodeMasstreeUpdate);
1109 
1110  if (entry->header_.get_type() == log::kLogCodeMasstreeDelete
1111  || entry->header_.get_type() == log::kLogCodeMasstreeUpdate) {
1112  // [Delete/Update] Physically deletes the last record in this page.
1113  SlotIndex index = key_count - 1;
1114  ASSERT_ND(!page->does_point_to_layer(index));
1115  ASSERT_ND(page->equal_key(index, key, key_length));
1116  page->set_key_count(index);
1117  }
1118  // Notice that this is "if", not "else if". UPDATE = DELETE + INSERT.
1119  if (entry->header_.get_type() == log::kLogCodeMasstreeInsert
1120  || entry->header_.get_type() == log::kLogCodeMasstreeUpdate) {
1121  // [Insert/Update] next-layer is already handled above, so just append it.
1122  ASSERT_ND(entry->header_.get_type() == log::kLogCodeMasstreeInsert);
1123  ASSERT_ND(key_count == 0 || !page->equal_key(key_count - 1, key, key_length)); // no dup
1124  KeyLength skip = last->layer_ * kSliceLen;
1125  append_border(
1126  slice,
1127  entry->header_.xct_id_,
1128  key_length - skip,
1129  key + skip + kSliceLen,
1130  entry->payload_count_,
1131  entry->get_payload(),
1132  last);
1133  }
1134  }
1135 
1136  CHECK_ERROR(flush_if_nearly_full());
1137 
1138  return kRetOk;
1139 }
1140 
1146 ErrorStack MasstreeComposeContext::init_root() {
1147  SnapshotPagePointer snapshot_page_id
1148  = storage_.get_control_block()->root_page_pointer_.snapshot_pointer_;
1149  if (snapshot_page_id != 0) {
1150  // based on previous snapshot page.
1151  ASSERT_ND(page_boundary_info_cur_pos_ == 0 && page_boundary_elements_ == 0);
1152  WRAP_ERROR_CODE(args_.previous_snapshot_files_->read_page(snapshot_page_id, root_));
1153  } else {
1154  // if this is the initial snapshot, we create a dummy image of the root page based on
1155  // partitioning. all composers in all reducers follow the same protocol so that
1156  // we can easily merge their outputs.
1157  PartitionerMetadata* metadata = PartitionerMetadata::get_metadata(engine_, id_);
1158  ASSERT_ND(metadata->valid_);
1159  MasstreePartitionerData* data
1160  = reinterpret_cast<MasstreePartitionerData*>(metadata->locate_data(engine_));
1161  ASSERT_ND(data->partition_count_ > 0);
1162  ASSERT_ND(data->partition_count_ < kMaxIntermediatePointers);
1163 
1164  root_->initialize_snapshot_page(id_, 0, 0, 1, kInfimumSlice, kSupremumSlice);
1165  ASSERT_ND(data->low_keys_[0] == kInfimumSlice);
1166  root_->get_minipage(0).pointers_[0].snapshot_pointer_ = 0;
1167  for (uint16_t i = 1; i < data->partition_count_; ++i) {
1168  root_->append_pointer_snapshot(data->low_keys_[i], 0);
1169  }
1170  }
1171 
1172  return kRetOk;
1173 }
1174 
1175 ErrorStack MasstreeComposeContext::open_first_level(const char* key, KeyLength key_length) {
1176  ASSERT_ND(cur_path_levels_ == 0);
1177  ASSERT_ND(key_length > 0);
1178  ASSERT_ND(is_key_aligned_and_zero_padded(key, key_length));
1179  PathLevel* first = cur_path_;
1180 
1182  uint8_t index = root_->find_minipage(slice);
1183  MasstreeIntermediatePage::MiniPage& minipage = root_->get_minipage(index);
1184  uint8_t index_mini = minipage.find_pointer(slice);
1185  SnapshotPagePointer* pointer_address = &minipage.pointers_[index_mini].snapshot_pointer_;
1186  SnapshotPagePointer old_pointer = *pointer_address;
1187  root_index_ = index;
1188  root_index_mini_ = index_mini;
1189 
1190  KeySlice low_fence;
1191  KeySlice high_fence;
1192  root_->extract_separators_snapshot(index, index_mini, &low_fence, &high_fence);
1193 
1194  if (old_pointer != 0) {
1195  // When we are opening an existing page, we digg further until we find a border page
1196  // to update/insert/delete the given key. this might recurse
1197  CHECK_ERROR(open_next_level(key, key_length, root_, kInfimumSlice, pointer_address));
1198  } else {
1199  // this should happen only while an initial snapshot for this storage.
1200  ASSERT_ND(storage_.get_control_block()->root_page_pointer_.snapshot_pointer_ == 0);
1201  // newly creating a whole subtree. Start with a border page.
1202  first->layer_ = 0;
1203  first->head_ = allocate_page();
1204  first->page_count_ = 1;
1205  first->tail_ = first->head_;
1206  first->set_no_more_next_original();
1207  first->low_fence_ = low_fence;
1208  first->high_fence_ = high_fence;
1209  MasstreePage* page = get_page(first->head_);
1210  SnapshotPagePointer page_id = page_id_base_ + first->head_;
1211  *pointer_address = page_id;
1212  MasstreeBorderPage* casted = reinterpret_cast<MasstreeBorderPage*>(page);
1213  casted->initialize_snapshot_page(id_, page_id, 0, low_fence, high_fence);
1214  page->header().page_id_ = page_id;
1215  ++cur_path_levels_;
1216  // this is it. this is an easier case. no recurse.
1217  }
1218 
1219  ASSERT_ND(cur_path_levels_ > 0);
1220  ASSERT_ND(first->low_fence_ == low_fence);
1221  ASSERT_ND(first->high_fence_ == high_fence);
1222  ASSERT_ND(minipage.pointers_[index_mini].snapshot_pointer_ != old_pointer);
1223  return kRetOk;
1224 }
1225 
1226 ErrorStack MasstreeComposeContext::open_next_level(
1227  const char* key,
1228  KeyLength key_length,
1229  MasstreePage* parent,
1230  KeySlice prefix_slice,
1231  SnapshotPagePointer* next_page_id) {
1232  ASSERT_ND(next_page_id);
1233  ASSERT_ND(key_length > 0);
1234  ASSERT_ND(is_key_aligned_and_zero_padded(key, key_length));
1235  SnapshotPagePointer old_page_id = *next_page_id;
1236  ASSERT_ND(old_page_id != 0);
1237  uint8_t this_level = cur_path_levels_;
1238  PathLevel* level = cur_path_ + this_level;
1239  ++cur_path_levels_;
1240 
1241  if (parent->is_border()) {
1242  store_cur_prefix(parent->get_layer(), prefix_slice);
1243  level->layer_ = parent->get_layer() + 1;
1244  } else {
1245  level->layer_ = parent->get_layer();
1246  }
1247  level->head_ = allocate_page();
1248  level->page_count_ = 1;
1249  level->tail_ = level->head_;
1250  level->set_no_more_next_original();
1251 
1252  SnapshotPagePointer page_id = page_id_base_ + level->head_;
1253  *next_page_id = page_id;
1254  KeyLength skip = level->layer_ * kSliceLen;
1255  KeySlice slice = normalize_be_bytes_full_aligned(key + skip);
1256 
1257  MasstreePage* target = get_page(level->head_);
1258  MasstreePage* original = get_original(this_level);
1259  WRAP_ERROR_CODE(args_.previous_snapshot_files_->read_page(old_page_id, original));
1260  level->low_fence_ = original->get_low_fence();
1261  level->high_fence_ = original->get_high_fence();
1262 
1263  snapshot::SnapshotId old_snapshot_id = extract_snapshot_id_from_snapshot_pointer(old_page_id);
1264  ASSERT_ND(old_snapshot_id != snapshot::kNullSnapshotId);
1265  if (UNLIKELY(old_snapshot_id == snapshot_id_)) {
1266  // if we are re-opening a page we have created in this execution,
1267  // we have to remove the old entry in page_boundary_info we created when we closed it.
1268  remove_old_page_boundary_info(old_page_id, original);
1269  }
1270 
1271  // We need to determine up to which records are before/equal to the key.
1272  // target page is initialized with the original page, but we will reduce key_count
1273  // to omit entries after the key.
1274  std::memcpy(target, original, kPageSize);
1275  target->header().page_id_ = page_id;
1276 
1277  if (original->is_border()) {
1278  // When this is a border page we might or might not have to go down.
1279  const MasstreeBorderPage* casted = as_border(original);
1280  SlotIndex key_count = original->get_key_count();
1281  auto result = casted->find_key_for_snapshot(slice, key + skip, key_length - skip);
1282 
1283  SlotIndex copy_count;
1284  bool go_down = false;
1285  bool create_layer = false;
1286  switch (result.match_type_) {
1288  copy_count = result.index_; // do not copy the next key yet. it's larger than key
1289  break;
1291  ASSERT_ND(result.index_ < key_count);
1292  copy_count = result.index_ + 1U; // copy the matched key too
1293  break;
1295  go_down = true;
1296  create_layer = true;
1297  ASSERT_ND(result.index_ < key_count);
1298  copy_count = result.index_ + 1U; // copy the matched key too
1299  break;
1300  default:
1302  go_down = true;
1303  ASSERT_ND(result.index_ < key_count);
1304  copy_count = result.index_ + 1U; // copy the matched key too
1305  break;
1306  }
1307 
1308  MasstreeBorderPage* target_casted = as_border(target);
1309  ASSERT_ND(copy_count <= key_count);
1310  target_casted->set_key_count(copy_count);
1311  level->next_original_ = copy_count + 1U;
1312  if (level->next_original_ >= key_count) {
1313  level->set_no_more_next_original();
1314  } else {
1315  level->next_original_slice_ = casted->get_slice(level->next_original_);
1316  level->next_original_remainder_ = casted->get_remainder_length(level->next_original_);
1317  }
1318 
1319  // now, do we have to go down further?
1320  if (!go_down) {
1321  return kRetOk; // we are done
1322  }
1323 
1324  ASSERT_ND(key_length > kSliceLen + skip); // otherwise cant go down next laye
1325 
1326  // go down, we don't have to worry about moving more original records to next layer.
1327  // if there are such records, the snapshot page has already done so.
1328  ASSERT_ND(!level->has_next_original() || level->next_original_slice_ > slice); // check it
1329 
1330  SlotIndex next_layer_index = target_casted->get_key_count() - 1;
1331  ASSERT_ND(target_casted->get_slice(next_layer_index) == slice);
1332  if (create_layer) {
1333  // this is more complicated. we have to migrate the current record to next layer
1334  open_next_level_create_layer(key, key_length, target_casted, slice, next_layer_index);
1335  ASSERT_ND(target_casted->does_point_to_layer(next_layer_index));
1336  } else {
1337  // then we follow the existing next layer pointer.
1338  ASSERT_ND(target_casted->does_point_to_layer(next_layer_index));
1339  ASSERT_ND(target_casted->get_next_layer(next_layer_index)->volatile_pointer_.is_null());
1340  SnapshotPagePointer* next_layer
1341  = &target_casted->get_next_layer(next_layer_index)->snapshot_pointer_;
1342  CHECK_ERROR(open_next_level(key, key_length, target_casted, slice, next_layer));
1343  }
1344  ASSERT_ND(cur_path_levels_ > this_level + 1U);
1345  } else {
1346  const MasstreeIntermediatePage* casted = as_intermdiate(original);
1347  uint8_t index = casted->find_minipage(slice);
1348  const MasstreeIntermediatePage::MiniPage& minipage = casted->get_minipage(index);
1349  uint8_t index_mini = minipage.find_pointer(slice);
1350 
1351  // inclusively up to index-index_mini can be copied. note that this also means
1352  // index=0/index_mini=0 is always guaranteed.
1353  MasstreeIntermediatePage* target_casted = as_intermdiate(target);
1354  target_casted->set_key_count(index);
1355  target_casted->get_minipage(index).key_count_ = index_mini;
1356  KeySlice this_fence;
1357  KeySlice next_fence;
1358  target_casted->extract_separators_snapshot(index, index_mini, &this_fence, &next_fence);
1359  level->next_original_ = index;
1360  level->next_original_mini_ = index_mini + 1U;
1361  level->next_original_slice_ = next_fence;
1362  if (level->next_original_mini_ > minipage.key_count_) {
1363  ++level->next_original_;
1364  level->next_original_mini_ = 0;
1365  if (level->next_original_ > casted->get_key_count()) {
1366  level->set_no_more_next_original();
1367  }
1368  }
1369 
1370  // as this is an intermediate page, we have to further go down.
1371  SnapshotPagePointer* next_address
1372  = &target_casted->get_minipage(index).pointers_[index_mini].snapshot_pointer_;
1373  CHECK_ERROR(open_next_level(key, key_length, target_casted, slice, next_address));
1374  ASSERT_ND(cur_path_levels_ > this_level + 1U);
1375  }
1376  return kRetOk;
1377 }
1378 
1379 void MasstreeComposeContext::open_next_level_create_layer(
1380  const char* key,
1381  KeyLength key_length,
1382  MasstreeBorderPage* parent,
1383  KeySlice prefix_slice,
1384  SlotIndex parent_index) {
1385  ASSERT_ND(!parent->does_point_to_layer(parent_index)); // not yet a next-layer pointer
1386  ASSERT_ND(parent->get_slice(parent_index) == prefix_slice);
1387  ASSERT_ND(key_length > 0);
1388  ASSERT_ND(is_key_aligned_and_zero_padded(key, key_length));
1389  // DLOG(INFO) << "asdasdas " << *parent;
1390  uint8_t this_level = cur_path_levels_;
1391  PathLevel* level = cur_path_ + this_level;
1392  ++cur_path_levels_;
1393 
1394  store_cur_prefix(parent->get_layer(), prefix_slice);
1395  level->layer_ = parent->get_layer() + 1;
1396  level->head_ = allocate_page();
1397  level->page_count_ = 1;
1398  level->tail_ = level->head_;
1399  level->set_no_more_next_original();
1400  level->low_fence_ = kInfimumSlice;
1401  level->high_fence_ = kSupremumSlice;
1402 
1403  // DLOG(INFO) << "Fdsdsfsdas " << *parent;
1404  ASSERT_ND(key_length > level->layer_ * kSliceLen);
1405 
1406  SnapshotPagePointer page_id = page_id_base_ + level->head_;
1407  MasstreeBorderPage* target = reinterpret_cast<MasstreeBorderPage*>(get_page(level->head_));
1408  target->initialize_snapshot_page(id_, page_id, level->layer_, kInfimumSlice, kSupremumSlice);
1409 
1410  // DLOG(INFO) << "eqeqwe2 " << *parent;
1411 
1412  // migrate the exiting record from parent.
1413  KeyLength remainder_length = parent->get_remainder_length(parent_index) - kSliceLen;
1414  const char* parent_suffix = parent->get_record(parent_index);
1415  ASSERT_ND(is_key_aligned_and_zero_padded(parent_suffix, remainder_length));
1416  KeySlice slice = normalize_be_bytes_full_aligned(parent_suffix);
1417  const char* suffix = parent_suffix + kSliceLen;
1418  const char* payload = parent->get_record_payload(parent_index);
1419  PayloadLength payload_count = parent->get_payload_length(parent_index);
1420  xct::XctId xct_id = parent->get_owner_id(parent_index)->xct_id_;
1421 
1422  // DLOG(INFO) << "eqeqwe3 " << *parent;
1423 
1424  // see the comment in PathLevel. If the existing key in parent is larger than searching key,
1425  // we move the existing record to a dummy original page here.
1426  MasstreeBorderPage* original = reinterpret_cast<MasstreeBorderPage*>(get_original(this_level));
1427  original->initialize_snapshot_page(id_, 0, level->layer_, kInfimumSlice, kSupremumSlice);
1428  ASSERT_ND(original->can_accomodate(0, remainder_length, payload_count));
1429  original->reserve_record_space(0, xct_id, slice, suffix, remainder_length, payload_count);
1430  original->increment_key_count();
1431  fill_payload_padded(original->get_record_payload(0), payload, payload_count);
1432 
1433  ASSERT_ND(original->ltgt_key(0, key, key_length) != 0); // if exactly same key, why here
1434  if (original->ltgt_key(0, key, key_length) < 0) { // the original record is larger than key
1435  VLOG(1) << "Interesting, moving an original record to a dummy page in next layer";
1436  // as the record is still not supposed to be added, we keep it in the original page
1437  level->next_original_ = 0;
1438  level->next_original_slice_ = slice;
1439  level->next_original_remainder_ = remainder_length;
1440  } else {
1441  // okay, insert it now.
1442  target->reserve_record_space(0, xct_id, slice, suffix, remainder_length, payload_count);
1443  target->increment_key_count();
1444  fill_payload_padded(target->get_record_payload(0), payload, payload_count);
1445  ASSERT_ND(target->ltgt_key(0, key, key_length) > 0);
1446  }
1447  // DLOG(INFO) << "easdasdf " << *original;
1448  // DLOG(INFO) << "dsdfsdf " << *target;
1449 
1450  // and modify the record to be a next layer pointer
1451  parent->replace_next_layer_snapshot(page_id);
1452  // DLOG(INFO) << "Fdsddss " << *parent;
1453 }
1454 
1455 inline bool MasstreeComposeContext::does_need_to_adjust_path(
1456  const char* key,
1457  KeyLength key_length) const {
1458  if (cur_path_levels_ == 0) {
1459  return true; // the path is empty. need to initialize path
1460  } else if (!cur_path_[0].contains_key(key, key_length)) {
1461  // first slice does not match
1462  return true;
1463  } else if (get_last_level()->layer_ == 0) {
1464  return false; // first slice check already done.
1465  }
1466 
1467  // Does the slices of the next_key match the current path? if not we have to close them
1468  uint8_t cur_path_layer = get_last_layer();
1469  if (cur_path_layer > 0) {
1470  uint16_t max_key_layer = (key_length - 1U) / kSliceLen;
1471  if (max_key_layer < cur_path_layer) {
1472  return true; // definitely not in this layer
1473  }
1474  uint16_t common_layers = count_common_slices(cur_prefix_be_, key, cur_path_layer);
1475  if (common_layers < cur_path_layer) {
1476  return true; // prefix doesn't match. we have to close the layer(s)
1477  }
1478  }
1479 
1480  // Does the in-layer slice of the key match the current page? if not we have to close them
1481  // key is 8-bytes padded, so we can simply use normalize_be_bytes_full_aligned
1482  KeyLength skip = cur_path_layer * kSliceLen;
1483  KeySlice next_slice = normalize_be_bytes_full_aligned(key + skip);
1484  return !get_last_level()->contains_slice(next_slice);
1485 }
1486 
1487 ErrorStack MasstreeComposeContext::adjust_path(const char* key, KeyLength key_length) {
1488  // Close levels/layers until prefix slice matches
1489  bool closed_something = false;
1490  if (cur_path_levels_ > 0) {
1491  PathLevel* last = get_last_level();
1492  if (last->layer_ > 0) {
1493  // Close layers until prefix slice matches
1494  uint16_t max_key_layer = (key_length - 1U) / kSliceLen;
1495  uint16_t cmp_layer = std::min<uint16_t>(last->layer_, max_key_layer);
1496  uint16_t common_layers = count_common_slices(cur_prefix_be_, key, cmp_layer);
1497  if (common_layers < last->layer_) {
1498  CHECK_ERROR(close_path_layer(common_layers));
1499  ASSERT_ND(common_layers == get_last_layer());
1500  closed_something = true;
1501  }
1502  }
1503 
1504  last = get_last_level();
1505  // Close levels upto fence matches
1506  KeyLength skip = last->layer_ * kSliceLen;
1507  KeySlice next_slice = normalize_be_bytes_full_aligned(key + skip);
1508  while (cur_path_levels_ > 0 && !last->contains_slice(next_slice)) {
1509  // the next key doesn't fit in the current path. need to close.
1510  if (cur_path_levels_ == 1U) {
1511  CHECK_ERROR(close_first_level());
1512  ASSERT_ND(cur_path_levels_ == 0);
1513  } else {
1514  CHECK_ERROR(close_last_level());
1515  closed_something = true;
1516  last = get_last_level();
1517  ASSERT_ND(get_last_level()->layer_ * kSliceLen == skip); // this shouldn't affect layer.
1518  }
1519  }
1520  }
1521 
1522  if (cur_path_levels_ == 0) {
1523  CHECK_ERROR(open_first_level(key, key_length));
1524  }
1525  ASSERT_ND(cur_path_levels_ > 0);
1526 
1527  // now, if the last level is an intermediate page, we definitely have to go deeper.
1528  // if it's a border page, we may have to go down, but at this point we are not sure.
1529  // so, just handle intermediate page case. after doing this, last level should be a border page.
1530  PathLevel* last = get_last_level();
1531  KeyLength skip = last->layer_ * kSliceLen;
1532  KeySlice next_slice = normalize_be_bytes_full_aligned(key + skip);
1533  ASSERT_ND(last->contains_slice(next_slice));
1534  MasstreePage* page = get_page(last->tail_);
1535  if (!page->is_border()) {
1536  ASSERT_ND(closed_something); // otherwise current path should have been already border page
1537  MasstreeIntermediatePage* casted = as_intermdiate(page);
1538 
1539  // as searching keys are sorted, the slice should be always larger than everything in the page
1540  ASSERT_ND(casted->find_minipage(next_slice) == casted->get_key_count());
1541  ASSERT_ND(casted->get_minipage(casted->get_key_count()).find_pointer(next_slice)
1542  == casted->get_minipage(casted->get_key_count()).key_count_);
1543  if (last->has_next_original() && last->next_original_slice_ <= next_slice) {
1544  ASSERT_ND(last->next_original_slice_ != next_slice);
1545  CHECK_ERROR(consume_original_upto_intermediate(next_slice, last));
1546  }
1547  ASSERT_ND(casted->find_minipage(next_slice) == casted->get_key_count());
1548  ASSERT_ND(casted->get_minipage(casted->get_key_count()).find_pointer(next_slice)
1549  == casted->get_minipage(casted->get_key_count()).key_count_);
1550  SlotIndex index = casted->get_key_count();
1551  MasstreeIntermediatePage::MiniPage& minipage = casted->get_minipage(index);
1552  SlotIndex index_mini = minipage.key_count_;
1553 #ifndef NDEBUG
1554  KeySlice existing_low;
1555  KeySlice existing_high;
1556  casted->extract_separators_snapshot(index, index_mini, &existing_low, &existing_high);
1557  ASSERT_ND(existing_low <= next_slice);
1558  ASSERT_ND(existing_high == casted->get_high_fence());
1559 #endif // NDEBUG
1560  SnapshotPagePointer* next_pointer = &minipage.pointers_[index_mini].snapshot_pointer_;
1561  CHECK_ERROR(open_next_level(key, key_length, page, 0, next_pointer));
1562  }
1563  ASSERT_ND(get_last_level()->contains_slice(next_slice));
1564  ASSERT_ND(get_page(get_last_level()->tail_)->is_border());
1565  return kRetOk;
1566 }
1567 
1568 inline void MasstreeComposeContext::append_border(
1569  KeySlice slice,
1570  xct::XctId xct_id,
1571  KeyLength remainder_length,
1572  const char* suffix,
1573  PayloadLength payload_count,
1574  const char* payload,
1575  PathLevel* level) {
1576  ASSERT_ND(remainder_length != kInitiallyNextLayer);
1577  ASSERT_ND(remainder_length < kMaxKeyLength);
1578  MasstreeBorderPage* target = as_border(get_page(level->tail_));
1579  SlotIndex key_count = target->get_key_count();
1580  SlotIndex new_index = key_count;
1581  ASSERT_ND(key_count == 0 || !target->will_conflict(key_count - 1, slice, remainder_length));
1582  ASSERT_ND(key_count == 0
1583  || !target->will_contain_next_layer(key_count - 1, slice, remainder_length));
1584  ASSERT_ND(key_count == 0 || target->ltgt_key(key_count - 1, slice, suffix, remainder_length) > 0);
1585 
1586  // This check is slightly more conservative than can_accomodate() when the page is almost full.
1587  const bool spacious = target->can_accomodate_snapshot(remainder_length, payload_count);
1588  if (UNLIKELY(!spacious)) {
1589  append_border_newpage(slice, level);
1590  MasstreeBorderPage* new_target = as_border(get_page(level->tail_));
1591  ASSERT_ND(target != new_target);
1592  target = new_target;
1593  new_index = 0;
1594  }
1595 
1596  target->reserve_record_space(new_index, xct_id, slice, suffix, remainder_length, payload_count);
1597  target->increment_key_count();
1598  fill_payload_padded(target->get_record_payload(new_index), payload, payload_count);
1599 }
1600 
1601 inline void MasstreeComposeContext::append_border_next_layer(
1602  KeySlice slice,
1603  xct::XctId xct_id,
1604  SnapshotPagePointer pointer,
1605  PathLevel* level) {
1606  MasstreeBorderPage* target = as_border(get_page(level->tail_));
1607  SlotIndex key_count = target->get_key_count();
1608  SlotIndex new_index = key_count;
1609  ASSERT_ND(key_count == 0 || !target->will_conflict(key_count - 1, slice, 0xFF));
1610  ASSERT_ND(key_count == 0 || target->ltgt_key(key_count - 1, slice, nullptr, kSliceLen) > 0);
1611  if (UNLIKELY(!target->can_accomodate(new_index, kInitiallyNextLayer, sizeof(DualPagePointer)))) {
1612  append_border_newpage(slice, level);
1613  MasstreeBorderPage* new_target = as_border(get_page(level->tail_));
1614  ASSERT_ND(target != new_target);
1615  target = new_target;
1616  new_index = 0;
1617  }
1618 
1619  target->append_next_layer_snapshot(xct_id, slice, pointer);
1620 }
1621 
1622 void MasstreeComposeContext::append_border_newpage(KeySlice slice, PathLevel* level) {
1623  MasstreeBorderPage* target = as_border(get_page(level->tail_));
1624  SlotIndex key_count = target->get_key_count();
1625 
1626  memory::PagePoolOffset new_offset = allocate_page();
1627  SnapshotPagePointer new_page_id = page_id_base_ + new_offset;
1628  MasstreeBorderPage* new_target = reinterpret_cast<MasstreeBorderPage*>(get_page(new_offset));
1629 
1630  KeySlice middle = slice;
1631  KeySlice high_fence = target->get_high_fence();
1632  new_target->initialize_snapshot_page(id_, new_page_id, level->layer_, middle, high_fence);
1633  target->set_foster_major_offset_unsafe(new_offset); // set next link
1634  target->set_high_fence_unsafe(middle);
1635  level->tail_ = new_offset;
1636  ++level->page_count_;
1637 
1638  // We have to make sure the same slice does not span two pages with only length different
1639  // (masstree protocol). So, we might have to migrate a few more records in rare cases.
1640  SlotIndex migrate_from;
1641  for (migrate_from = key_count; target->get_slice(migrate_from - 1U) == slice; --migrate_from) {
1642  continue;
1643  }
1644  if (migrate_from != key_count) {
1645  LOG(INFO) << "Interesting, migrate records to avoid key slice spanning two pages";
1646  for (SlotIndex old_index = migrate_from; old_index < key_count; ++old_index) {
1647  ASSERT_ND(target->get_slice(old_index) == slice);
1648  xct::XctId xct_id = target->get_owner_id(old_index)->xct_id_;
1649  SlotIndex new_index = new_target->get_key_count();
1650  if (target->does_point_to_layer(old_index)) {
1651  const DualPagePointer* pointer = target->get_next_layer(old_index);
1652  ASSERT_ND(pointer->volatile_pointer_.is_null());
1653  new_target->append_next_layer_snapshot(xct_id, slice, pointer->snapshot_pointer_);
1654  } else {
1655  PayloadLength payload_count = target->get_payload_length(old_index);
1656  KeyLength remainder = target->get_remainder_length(old_index);
1657  new_target->reserve_record_space(
1658  new_index,
1659  xct_id,
1660  slice,
1661  target->get_record(old_index),
1662  remainder,
1663  payload_count);
1664  new_target->increment_key_count();
1666  new_target->get_record_payload(new_index),
1667  target->get_record_payload(old_index),
1668  payload_count);
1669  }
1670  }
1671 
1672  target->header().set_key_count(migrate_from);
1673  ASSERT_ND(new_target->get_key_count() == key_count - migrate_from);
1674  }
1675 }
1676 
1677 
1678 inline void MasstreeComposeContext::append_intermediate(
1679  KeySlice low_fence,
1680  SnapshotPagePointer pointer,
1681  PathLevel* level) {
1682  ASSERT_ND(low_fence != kInfimumSlice);
1683  ASSERT_ND(low_fence != kSupremumSlice);
1684  MasstreeIntermediatePage* target = as_intermdiate(get_page(level->tail_));
1685  if (UNLIKELY(target->is_full_snapshot())) {
1686  append_intermediate_newpage_and_pointer(low_fence, pointer, level);
1687  } else {
1688  target->append_pointer_snapshot(low_fence, pointer);
1689  }
1690 }
1691 
1692 void MasstreeComposeContext::append_intermediate_newpage_and_pointer(
1693  KeySlice low_fence,
1694  SnapshotPagePointer pointer,
1695  PathLevel* level) {
1696  MasstreeIntermediatePage* target = as_intermdiate(get_page(level->tail_));
1697 
1698  memory::PagePoolOffset new_offset = allocate_page();
1699  SnapshotPagePointer new_page_id = page_id_base_ + new_offset;
1700  MasstreeIntermediatePage* new_target
1701  = reinterpret_cast<MasstreeIntermediatePage*>(get_page(new_offset));
1702 
1703  KeySlice middle = low_fence;
1704  KeySlice high_fence = target->get_high_fence();
1705  uint8_t btree_level = target->get_btree_level();
1706  uint8_t layer = level->layer_;
1707  new_target->initialize_snapshot_page(id_, new_page_id, layer, btree_level, middle, high_fence);
1708  target->set_foster_major_offset_unsafe(new_offset); // set next link
1709  target->set_high_fence_unsafe(middle);
1710  level->tail_ = new_offset;
1711  ++level->page_count_;
1712 
1713  // Unlike border page, slices are unique in intermediate page, so nothing to migrate.
1714  // Instead, we also complete setting the installed pointer because the first pointer
1715  // is a bit special in intermediate pages.
1716  new_target->get_minipage(0).pointers_[0].volatile_pointer_.clear();
1717  new_target->get_minipage(0).pointers_[0].snapshot_pointer_ = pointer;
1718 }
1719 
1720 ErrorStack MasstreeComposeContext::consume_original_upto_border(
1721  KeySlice slice,
1722  KeyLength key_length,
1723  PathLevel* level) {
1724  ASSERT_ND(level == get_last_level()); // so far this is the only usecase
1725  ASSERT_ND(key_length > level->layer_ * kSliceLen);
1726  ASSERT_ND(level->needs_to_consume_original(slice, key_length));
1727  uint16_t level_index = level - cur_path_;
1728  MasstreeBorderPage* original = as_border(get_original(level_index));
1729  while (level->needs_to_consume_original(slice, key_length)) {
1730  SlotIndex index = level->next_original_;
1731  ASSERT_ND(level->next_original_slice_ == original->get_slice(index));
1732  ASSERT_ND(level->next_original_remainder_ == original->get_remainder_length(index));
1733  ASSERT_ND(level->has_next_original());
1734  ASSERT_ND(level->next_original_slice_ <= slice);
1735  KeySlice original_slice = original->get_slice(index);
1736  KeyLength original_remainder = original->get_remainder_length(index);
1737  xct::XctId xct_id = original->get_owner_id(index)->xct_id_;
1738  if (original->does_point_to_layer(index)) {
1739  const DualPagePointer* pointer = original->get_next_layer(index);
1740  ASSERT_ND(pointer->volatile_pointer_.is_null());
1741  append_border_next_layer(original_slice, xct_id, pointer->snapshot_pointer_, level);
1742  } else {
1743  append_border(
1744  original_slice,
1745  xct_id,
1746  original_remainder,
1747  original->get_record(index),
1748  original->get_payload_length(index),
1749  original->get_record_payload(index),
1750  level);
1751  }
1752  ++level->next_original_;
1753  if (level->next_original_ >= original->get_key_count()) {
1754  level->set_no_more_next_original();
1755  } else {
1756  level->next_original_slice_ = original->get_slice(level->next_original_);
1757  level->next_original_remainder_ = original->get_remainder_length(level->next_original_);
1758  }
1759  }
1760  return kRetOk;
1761 }
1762 
1763 ErrorStack MasstreeComposeContext::consume_original_upto_intermediate(
1764  KeySlice slice,
1765  PathLevel* level) {
1766  ASSERT_ND(level->has_next_original());
1767  ASSERT_ND(level->next_original_slice_ < slice);
1768  uint16_t level_index = level - cur_path_;
1769  MasstreeIntermediatePage* original = as_intermdiate(get_original(level_index));
1770  while (level->has_next_original() && level->next_original_slice_ < slice) {
1771  MasstreeIntermediatePointerIterator it(original);
1772  it.index_ = level->next_original_;
1773  it.index_mini_ = level->next_original_mini_;
1774  ASSERT_ND(it.is_valid());
1775  append_intermediate(it.get_low_key(), it.get_pointer().snapshot_pointer_, level);
1776  it.next();
1777  if (it.is_valid()) {
1778  level->next_original_ = it.index_;
1779  level->next_original_mini_ = it.index_mini_;
1780  level->next_original_slice_ = it.get_low_key();
1781  } else {
1782  level->set_no_more_next_original();
1783  }
1784  }
1785  return kRetOk;
1786 }
1787 
1788 ErrorStack MasstreeComposeContext::consume_original_all() {
1789  PathLevel* last = get_last_level();
1790  if (!last->has_next_original()) {
1791  return kRetOk; // nothing to consume
1792  }
1793 
1794  MasstreePage* page = get_original(get_last_level_index());
1795  if (page->is_border()) {
1796  MasstreeBorderPage* original = as_border(page);
1797  while (last->has_next_original()) {
1798  SlotIndex index = last->next_original_;
1799  KeySlice slice = original->get_slice(index);
1800  xct::XctId xct_id = original->get_owner_id(index)->xct_id_;
1801  if (original->does_point_to_layer(index)) {
1802  const DualPagePointer* pointer = original->get_next_layer(index);
1803  ASSERT_ND(pointer->volatile_pointer_.is_null());
1804  append_border_next_layer(slice, xct_id, pointer->snapshot_pointer_, last);
1805  } else {
1806  append_border(
1807  slice,
1808  xct_id,
1809  original->get_remainder_length(index),
1810  original->get_record(index),
1811  original->get_payload_length(index),
1812  original->get_record_payload(index),
1813  last);
1814  }
1815  ++last->next_original_;
1816  if (last->next_original_ >= original->get_key_count()) {
1817  break;
1818  }
1819  }
1820  } else {
1821  MasstreeIntermediatePage* original = as_intermdiate(page);
1822  MasstreeIntermediatePointerIterator it(original);
1823  it.index_ = last->next_original_;
1824  it.index_mini_ = last->next_original_mini_;
1825  for (; it.is_valid(); it.next()) {
1826  append_intermediate(it.get_low_key(), it.get_pointer().snapshot_pointer_, last);
1827  }
1828  }
1829  last->set_no_more_next_original();
1830  return kRetOk;
1831 }
1832 
1833 ErrorStack MasstreeComposeContext::close_level_grow_subtree(
1834  SnapshotPagePointer* root_pointer,
1835  KeySlice subtree_low,
1836  KeySlice subtree_high) {
1837  PathLevel* last = get_last_level();
1838  ASSERT_ND(last->low_fence_ == subtree_low);
1839  ASSERT_ND(last->high_fence_ == subtree_high);
1840  ASSERT_ND(last->page_count_ > 1U);
1841  ASSERT_ND(!last->has_next_original());
1842 
1843  std::vector<FenceAndPointer> children;
1844  children.reserve(last->page_count_);
1845  uint8_t cur_btree_level = 0;
1846  for (memory::PagePoolOffset cur = last->head_; cur != 0;) {
1847  MasstreePage* page = get_page(cur);
1848  ASSERT_ND(page->is_foster_minor_null());
1849  FenceAndPointer child;
1850  child.pointer_ = page_id_base_ + cur;
1851  child.low_fence_ = page->get_low_fence();
1852  children.emplace_back(child);
1853  cur = page->get_foster_major().get_offset();
1854  page->set_foster_major_offset_unsafe(0); // no longer needed
1855  cur_btree_level = page->get_btree_level();
1856  }
1857  ASSERT_ND(children.size() == last->page_count_);
1858 
1859  // The level is now nullified
1860  // Now, replace the last level with a newly created level.
1861  memory::PagePoolOffset new_offset = allocate_page();
1862  SnapshotPagePointer new_page_id = page_id_base_ + new_offset;
1863  MasstreeIntermediatePage* new_page
1864  = reinterpret_cast<MasstreeIntermediatePage*>(get_page(new_offset));
1865  new_page->initialize_snapshot_page(
1866  id_,
1867  new_page_id,
1868  last->layer_,
1869  cur_btree_level + 1U, // add a new B-tree level
1870  last->low_fence_,
1871  last->high_fence_);
1872  last->head_ = new_offset;
1873  last->tail_ = new_offset;
1874  last->page_count_ = 1;
1875  ASSERT_ND(children.size() > 0);
1876  ASSERT_ND(children[0].low_fence_ == subtree_low);
1877  new_page->get_minipage(0).pointers_[0].volatile_pointer_.clear();
1878  new_page->get_minipage(0).pointers_[0].snapshot_pointer_ = children[0].pointer_;
1879  for (uint32_t i = 1; i < children.size(); ++i) {
1880  const FenceAndPointer& child = children[i];
1881  append_intermediate(child.low_fence_, child.pointer_, last);
1882  }
1883 
1884 
1885  // also register the pages in the newly created level.
1886  WRAP_ERROR_CODE(close_level_register_page_boundaries());
1887 
1888  if (last->page_count_ > 1U) {
1889  // recurse until we get just one page. this also means, when there are skewed inserts,
1890  // we might have un-balanced masstree (some subtree is deeper). not a big issue.
1891  CHECK_ERROR(close_level_grow_subtree(root_pointer, subtree_low, subtree_high));
1892  } else {
1893  *root_pointer = new_page_id;
1894  }
1895  return kRetOk;
1896 }
1897 
1898 ErrorStack MasstreeComposeContext::pushup_non_root() {
1899  ASSERT_ND(cur_path_levels_ > 1U); // there is a parent level
1900  PathLevel* last = get_last_level();
1901  PathLevel* parent = get_second_last_level();
1902  ASSERT_ND(last->low_fence_ != kInfimumSlice || last->high_fence_ != kSupremumSlice);
1903  ASSERT_ND(last->page_count_ > 1U);
1904  ASSERT_ND(!last->has_next_original());
1905 
1906  bool is_head = true;
1907  for (memory::PagePoolOffset cur = last->head_; cur != 0;) {
1908  MasstreePage* page = get_page(cur);
1909  ASSERT_ND(page->is_foster_minor_null());
1910  SnapshotPagePointer pointer = page_id_base_ + cur;
1911  KeySlice low_fence = page->get_low_fence();
1912  ASSERT_ND(cur != last->head_ || low_fence == last->low_fence_); // first entry's low_fence
1913  ASSERT_ND(cur != last->tail_ || page->get_high_fence() == last->high_fence_); // tail's high
1914  ASSERT_ND(cur != last->tail_ || page->is_foster_major_null()); // iff tail, has no next
1915  ASSERT_ND(cur == last->tail_ || !page->is_foster_major_null());
1916  cur = page->get_foster_major().get_offset();
1917  page->set_foster_major_offset_unsafe(0); // no longer needed
1918 
1919  // before pushing up the pointer, we might have to consume original pointers
1920  if (parent->has_next_original() && parent->next_original_slice_ <= low_fence) {
1921  ASSERT_ND(parent->next_original_slice_ != low_fence);
1922  CHECK_ERROR(consume_original_upto_intermediate(low_fence, parent));
1923  }
1924 
1925  // if this is a re-opened existing page (which is always the head), we already have
1926  // a pointer to this page in the parent page, so appending it will cause a duplicate.
1927  // let's check if that's the case.
1928  bool already_appended = false;
1929  if (is_head) {
1930  const MasstreeIntermediatePage* parent_page = as_intermdiate(get_page(parent->tail_));
1931  SlotIndex index = parent_page->get_key_count();
1932  const MasstreeIntermediatePage::MiniPage& mini = parent_page->get_minipage(index);
1933  SlotIndex index_mini = mini.key_count_;
1934  if (mini.pointers_[index_mini].snapshot_pointer_ == pointer) {
1935  already_appended = true;
1936 #ifndef NDEBUG
1937  // If that's the case, the low-fence should match. let's check.
1938  KeySlice check_low, check_high;
1939  parent_page->extract_separators_snapshot(index, index_mini, &check_low, &check_high);
1940  ASSERT_ND(check_low == low_fence);
1941  // high-fence should be same as tail's high if this level has split. let's check it, too.
1942  MasstreePage* tail = get_page(last->tail_);
1943  ASSERT_ND(check_high == tail->get_high_fence());
1944 #endif // NDEBUG
1945  }
1946  }
1947  if (!already_appended) {
1948  append_intermediate(low_fence, pointer, parent);
1949  }
1950  is_head = false;
1951  }
1952 
1953  return kRetOk;
1954 }
1955 
1956 ErrorStack MasstreeComposeContext::close_path_layer(uint16_t max_layer) {
1957  DVLOG(1) << "Closing path up to layer-" << max_layer;
1958  ASSERT_ND(get_last_layer() > max_layer);
1959  while (true) {
1960  PathLevel* last = get_last_level();
1961  if (last->layer_ <= max_layer) {
1962  break;
1963  }
1964  CHECK_ERROR(close_last_level());
1965  }
1966  ASSERT_ND(get_last_layer() == max_layer);
1967  return kRetOk;
1968 }
1969 
1970 ErrorStack MasstreeComposeContext::close_last_level() {
1971  ASSERT_ND(cur_path_levels_ > 1U); // do not use this method to close the first one.
1972  DVLOG(1) << "Closing the last level " << *get_last_level();
1973  PathLevel* last = get_last_level();
1974  PathLevel* parent = get_second_last_level();
1975 
1976  // before closing, consume all records in original page.
1977  CHECK_ERROR(consume_original_all());
1978  WRAP_ERROR_CODE(close_level_register_page_boundaries());
1979 
1980 #ifndef NDEBUG
1981  {
1982  // some sanity checks.
1983  KeySlice prev = last->low_fence_;
1984  uint32_t counted = 0;
1985  for (memory::PagePoolOffset cur = last->head_; cur != 0;) {
1986  const MasstreePage* page = get_page(cur);
1987  ++counted;
1988  ASSERT_ND(page->get_low_fence() == prev);
1989  ASSERT_ND(page->get_high_fence() > prev);
1990  ASSERT_ND(page->get_layer() == last->layer_);
1991  prev = page->get_high_fence();
1992  cur = page->get_foster_major().get_offset();
1993  if (page->is_border()) {
1994  ASSERT_ND(page->get_key_count() > 0);
1995  }
1996  }
1997  ASSERT_ND(prev == last->high_fence_);
1998  ASSERT_ND(counted == last->page_count_);
1999  }
2000 #endif // NDEBUG
2001 
2002  // Closing this level means we might have to push up the last level's chain to previous.
2003  if (last->page_count_ > 1U) {
2004  ASSERT_ND(parent->layer_ <= last->layer_);
2005  bool last_is_root = (parent->layer_ != last->layer_);
2006  if (!last_is_root) {
2007  // 1) last is not layer-root, then we append child pointers to parent. simpler
2008  CHECK_ERROR(pushup_non_root());
2009  } else {
2010  // 2) last is layer-root, then we add another level (intermediate page) as a new root
2011  MasstreeBorderPage* parent_tail = as_border(get_page(parent->tail_));
2012  ASSERT_ND(parent_tail->get_key_count() > 0);
2013  ASSERT_ND(parent_tail->does_point_to_layer(parent_tail->get_key_count() - 1));
2014  SnapshotPagePointer* parent_pointer
2015  = &parent_tail->get_next_layer(parent_tail->get_key_count() - 1)->snapshot_pointer_;
2016  CHECK_ERROR(close_level_grow_subtree(parent_pointer, kInfimumSlice, kSupremumSlice));
2017  }
2018  }
2019 
2020  ASSERT_ND(!last->has_next_original());
2021  --cur_path_levels_;
2022  ASSERT_ND(cur_path_levels_ >= 1U);
2023  return kRetOk;
2024 }
2025 ErrorStack MasstreeComposeContext::close_first_level() {
2026  ASSERT_ND(cur_path_levels_ == 1U);
2027  ASSERT_ND(get_last_layer() == 0U);
2028  PathLevel* first = cur_path_;
2029 
2030  // Do we have to make another level?
2031  CHECK_ERROR(consume_original_all());
2032  WRAP_ERROR_CODE(close_level_register_page_boundaries());
2033 
2034  SnapshotPagePointer current_pointer = first->head_ + page_id_base_;
2035  DualPagePointer& root_pointer = root_->get_minipage(root_index_).pointers_[root_index_mini_];
2036  ASSERT_ND(root_pointer.volatile_pointer_.is_null());
2037  ASSERT_ND(root_pointer.snapshot_pointer_ == current_pointer);
2038  KeySlice root_low, root_high;
2039  root_->extract_separators_snapshot(root_index_, root_index_mini_, &root_low, &root_high);
2040  if (first->page_count_ > 1U) {
2041  CHECK_ERROR(close_level_grow_subtree(&root_pointer.snapshot_pointer_, root_low, root_high));
2042  }
2043 
2044  // adjust root page's btree-level if needed. as an exceptional rule, btree-level of 1st layer root
2045  // is max(child's level)+1.
2046  ASSERT_ND(first->page_count_ == 1U);
2047  MasstreePage* child = get_page(first->head_);
2048  if (child->get_btree_level() + 1U > root_->get_btree_level()) {
2049  VLOG(0) << "Masstree-" << id_ << " subtree of first layer root has grown.";
2050  root_->header().masstree_in_layer_level_ = child->get_btree_level() + 1U;
2051  }
2052 
2053  cur_path_levels_ = 0;
2054  root_index_ = 0xDA; // just some
2055  root_index_mini_= 0xDA; // random number
2056  return kRetOk;
2057 }
2058 
2059 ErrorStack MasstreeComposeContext::close_all_levels() {
2060  LOG(INFO) << "Closing all levels except root_";
2061  while (cur_path_levels_ > 1U) {
2062  CHECK_ERROR(close_last_level());
2063  }
2064  ASSERT_ND(cur_path_levels_ <= 1U);
2065  if (cur_path_levels_ > 0) {
2066  CHECK_ERROR(close_first_level());
2067  }
2068  ASSERT_ND(cur_path_levels_ == 0);
2069  LOG(INFO) << "Closed all levels. now buffered pages=" << allocated_pages_;
2070  return kRetOk;
2071 }
2072 
2073 ErrorStack MasstreeComposeContext::flush_buffer() {
2074  LOG(INFO) << "Flushing buffer. buffered pages=" << allocated_pages_;
2075  // 1) Close all levels. Otherwise we might still have many 'active' pages.
2076  close_all_levels();
2077 
2078  ASSERT_ND(allocated_pages_ <= max_pages_);
2079 
2080  WRAP_ERROR_CODE(get_writer()->dump_pages(0, allocated_pages_));
2081  allocated_pages_ = 0;
2082  page_id_base_ = get_writer()->get_next_page_id();
2083  write_dummy_page_zero();
2084  return kRetOk;
2085 }
2086 
2087 void MasstreeComposeContext::store_cur_prefix(uint8_t layer, KeySlice prefix_slice) {
2088  assorted::write_bigendian<KeySlice>(prefix_slice, cur_prefix_be_ + layer * kSliceLen);
2089  cur_prefix_slices_[layer] = prefix_slice;
2090 }
2091 
2097 void MasstreeComposeContext::remove_old_page_boundary_info(
2098  SnapshotPagePointer page_id,
2099  MasstreePage* page) {
2101  ASSERT_ND(snapshot_id == snapshot_id_);
2102  ASSERT_ND(extract_numa_node_from_snapshot_pointer(page_id) == numa_node_);
2103  ASSERT_ND(page->header().page_id_ == page_id);
2104  ASSERT_ND(page_boundary_info_cur_pos_ > 0);
2105  ASSERT_ND(page_boundary_elements_ > 0);
2106  ASSERT_ND(cur_path_levels_ > 0);
2107 
2108  // this should happen very rarely (after flush_buffer), so the performance doesn't matter.
2109  // hence we do somewhat sloppy thing here for simpler code.
2110  LOG(INFO) << "Removing a page_boundary_info entry for a re-opened page. This should happen"
2111  << " very infrequently, or the buffer size is too small.";
2112  const uint8_t btree_level = page->get_btree_level();
2113  const uint8_t layer = page->get_layer();
2114  const KeySlice* prefixes = cur_prefix_slices_;
2115  const KeySlice low = page->get_low_fence();
2116  const KeySlice high = page->get_high_fence();
2117  const uint32_t hash = PageBoundaryInfo::calculate_hash(btree_level, layer, prefixes, low, high);
2118 
2119  // check them all! again, this method is called very occasionally
2120  bool found = false;
2121  for (uint32_t i = 0; LIKELY(i < page_boundary_elements_); ++i) {
2122  if (LIKELY(page_boundary_sort_[i].hash_ != hash)) {
2123  continue;
2124  }
2125  ASSERT_ND(page_boundary_sort_[i].info_pos_ < page_boundary_info_cur_pos_);
2126  PageBoundaryInfo* info = get_page_boundary_info(page_boundary_sort_[i].info_pos_);
2127  if (info->exact_match(btree_level, layer, prefixes, low, high)) {
2128  ASSERT_ND(!info->removed_);
2129  // mark the entry as removed. later exact_match() will ignore this entry, although
2130  // hash value is still there. but it only causes a bit more false positives.
2131  info->removed_ = true;
2132  found = true;
2133  break;
2134  }
2135  }
2136 
2137  if (!found) {
2138  LOG(ERROR) << "WTF. The old page_boundary_info entry was not found. page_id="
2139  << assorted::Hex(page_id);
2140  }
2141 }
2142 
2143 inline void MasstreeComposeContext::assert_page_boundary_not_exists(
2144  uint8_t btree_level,
2145  uint8_t layer,
2146  const KeySlice* prefixes,
2147  KeySlice low,
2148  KeySlice high) const {
2149 #ifndef NDEBUG
2150  uint32_t hash = PageBoundaryInfo::calculate_hash(btree_level, layer, prefixes, low, high);
2151  for (uint32_t i = 0; LIKELY(i < page_boundary_elements_); ++i) {
2152  if (LIKELY(page_boundary_sort_[i].hash_ != hash)) {
2153  continue;
2154  }
2155  ASSERT_ND(page_boundary_sort_[i].info_pos_ < page_boundary_info_cur_pos_);
2156  const PageBoundaryInfo* info = get_page_boundary_info(page_boundary_sort_[i].info_pos_);
2157  bool exists = info->exact_match(btree_level, layer, prefixes, low, high);
2158  if (exists) { // just for debugging.
2159  // I think I figured this out. See comment of PageBoundaryInfo::btree_level_.
2160  info->exact_match(btree_level, layer, prefixes, low, high);
2161  }
2162  ASSERT_ND(!exists);
2163  }
2164 #else // NDEBUG
2165  UNUSED_ND(btree_level);
2166  UNUSED_ND(layer);
2167  UNUSED_ND(prefixes);
2168  UNUSED_ND(low);
2169  UNUSED_ND(high);
2170 #endif // NDEBUG
2171 }
2172 
2173 ErrorCode MasstreeComposeContext::close_level_register_page_boundaries() {
2174  // this method must be called *after* the pages in the last level are finalized because
2175  // we need to know the page fences. this is thus called at the end of close_xxx_level
2176  ASSERT_ND(cur_path_levels_ > 0U);
2177  PathLevel* last = get_last_level();
2178  ASSERT_ND(!last->has_next_original()); // otherwise tail's page boundary is not finalized yet
2179 
2180 #ifndef NDEBUG
2181  // let's check that the last level is finalized.
2182  KeySlice prev = last->low_fence_;
2183  uint32_t counted = 0;
2184  for (memory::PagePoolOffset cur = last->head_; cur != 0;) {
2185  const MasstreePage* page = get_page(cur);
2186  ++counted;
2187  ASSERT_ND(page->get_low_fence() == prev);
2188  ASSERT_ND(page->get_high_fence() > prev);
2189  ASSERT_ND(page->get_layer() == last->layer_);
2190  prev = page->get_high_fence();
2191  cur = page->get_foster_major().get_offset();
2192  }
2193  ASSERT_ND(prev == last->high_fence_);
2194  ASSERT_ND(counted == last->page_count_);
2195 #endif // NDEBUG
2196 
2197 
2198  KeySlice prev_high = 0; // only for assertion
2199  for (memory::PagePoolOffset cur = last->head_; cur != 0;) {
2200  const MasstreePage* page = get_page(cur);
2201  const SnapshotPagePointer page_id = page->header().page_id_;
2202  const uint8_t btree_level = page->get_btree_level();
2203  const uint8_t layer = last->layer_;
2204  const KeySlice low = page->get_low_fence();
2205  const KeySlice high = page->get_high_fence();
2206  ASSERT_ND(low < high);
2207  ASSERT_ND(cur == last->head_ || low == prev_high);
2208  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(page_id) == snapshot_id_);
2209 
2210  // we might have to expand memory for page_boundary_info.
2211  uint16_t info_size = (layer + 3U) * sizeof(KeySlice); // see dynamic_sizeof()
2212  if (UNLIKELY(page_boundary_elements_ == max_page_boundary_elements_
2213  || page_boundary_info_cur_pos_ + (info_size / 8U) >= page_boundary_info_capacity_)) {
2214  LOG(INFO) << "Automatically expanding memory for page_boundary_info. This is super-rare";
2215  CHECK_ERROR_CODE(get_writer()->expand_intermediate_memory(
2216  get_writer()->get_intermediate_size() * 2U,
2217  true)); // we need to retain the current content
2218  refresh_page_boundary_info_variables();
2219  }
2220  ASSERT_ND(page_boundary_elements_ < max_page_boundary_elements_);
2221  ASSERT_ND(page_boundary_info_cur_pos_ + (info_size / 8U) < page_boundary_info_capacity_);
2222  assert_page_boundary_not_exists(btree_level, layer, cur_prefix_slices_, low, high);
2223 
2224  PageBoundaryInfo* info = get_page_boundary_info(page_boundary_info_cur_pos_);
2225  info->btree_level_ = btree_level;
2226  info->removed_ = false;
2227  info->layer_ = layer;
2228  ASSERT_ND(info->dynamic_sizeof() == info_size);
2230  ASSERT_ND(local_id > 0);
2231  ASSERT_ND(local_id < (1ULL << 40));
2232  info->local_snapshot_pointer_high_ = local_id >> 32;
2233  info->local_snapshot_pointer_low_ = static_cast<uint32_t>(local_id);
2234  for (uint8_t i = 0; i < layer; ++i) {
2235  info->slices_[i] = cur_prefix_slices_[i];
2236  }
2237  info->slices_[layer] = low;
2238  info->slices_[layer + 1] = high;
2239 
2240  page_boundary_sort_[page_boundary_elements_].hash_
2241  = PageBoundaryInfo::calculate_hash(btree_level, layer, cur_prefix_slices_, low, high);
2242  page_boundary_sort_[page_boundary_elements_].info_pos_ = page_boundary_info_cur_pos_;
2243  ++page_boundary_elements_;
2244  page_boundary_info_cur_pos_ += info->dynamic_sizeof() / 8U;
2245  ASSERT_ND(!page->get_foster_major().is_null() || cur == last->tail_);
2246  cur = page->get_foster_major().get_offset();
2247  prev_high = high;
2248  }
2249 
2250  return kErrorCodeOk;
2251 }
2252 
2253 void MasstreeComposeContext::sort_page_boundary_info() {
2254  ASSERT_ND(page_boundary_info_cur_pos_ <= page_boundary_info_capacity_);
2255  ASSERT_ND(page_boundary_elements_ <= max_page_boundary_elements_);
2256  debugging::StopWatch watch;
2257  std::sort(page_boundary_sort_, page_boundary_sort_ + page_boundary_elements_);
2258  watch.stop();
2259  VLOG(0) << "MasstreeStorage-" << id_ << " sorted " << page_boundary_elements_ << " entries"
2260  << " to track page_boundary_info in " << watch.elapsed_ms() << "ms";
2261 }
2262 
2263 inline SnapshotPagePointer MasstreeComposeContext::lookup_page_boundary_info(
2264  uint8_t btree_level,
2265  uint8_t layer,
2266  const KeySlice* prefixes,
2267  KeySlice low,
2268  KeySlice high) const {
2269  PageBoundarySort dummy;
2270  dummy.hash_ = PageBoundaryInfo::calculate_hash(btree_level, layer, prefixes, low, high);
2271  const PageBoundarySort* begin_entry = std::lower_bound(
2272  page_boundary_sort_,
2273  page_boundary_sort_ + page_boundary_elements_,
2274  dummy,
2276  uint32_t begin = begin_entry - page_boundary_sort_;
2277  for (uint32_t i = begin; i < page_boundary_elements_; ++i) {
2278  if (page_boundary_sort_[i].hash_ != dummy.hash_) {
2279  break;
2280  }
2281  const PageBoundaryInfo* info = get_page_boundary_info(page_boundary_sort_[i].info_pos_);
2282  if (!info->exact_match(btree_level, layer, prefixes, low, high)) {
2283  continue;
2284  }
2285  SnapshotLocalPageId local_page_id = info->get_local_page_id();
2286  ASSERT_ND(local_page_id > 0);
2287  SnapshotPagePointer page_id = to_snapshot_page_pointer(snapshot_id_, numa_node_, local_page_id);
2288  ASSERT_ND(page_id < get_writer()->get_next_page_id()); // otherwise not yet flushed
2289  return page_id;
2290  }
2291  DVLOG(1) << "exactly corresponding page boundaries not found. cannot install a snapshot"
2292  << " pointer";
2293  return 0;
2294 }
2295 
2296 ErrorStack MasstreeComposeContext::install_snapshot_pointers(uint64_t* installed_count) const {
2297  ASSERT_ND(allocated_pages_ == 1U); // this method must be called at the end, after flush_buffer
2298  *installed_count = 0;
2299  VolatilePagePointer pointer = storage_.get_control_block()->root_page_pointer_.volatile_pointer_;
2300  if (pointer.is_null()) {
2301  VLOG(0) << "No volatile pages.. maybe while restart?";
2302  return kRetOk;
2303  }
2304 
2305  const memory::GlobalVolatilePageResolver& resolver
2307  MasstreeIntermediatePage* volatile_root
2308  = reinterpret_cast<MasstreeIntermediatePage*>(resolver.resolve_offset(pointer));
2309 
2310  KeySlice prefixes[kMaxLayers];
2311  std::memset(prefixes, 0, sizeof(prefixes));
2312 
2313  // then, at least this snapshot contains no snapshot page in next layers, so we don't have to
2314  // check any border pages (although the volatile world might now contain next layer).
2315  const bool no_next_layer = merge_sort_->are_all_single_layer_logs();
2316 
2317  debugging::StopWatch watch;
2318  // Install pointers. same as install_snapshot_pointers_recurse EXCEPT we skip other partition.
2319  // Because the volatle world only splits pages (no merge), no child spans two partitions.
2320  // So, it's easy to determine whether this composer should follow the pointer.
2321  // If there is such a pointer as a rare case for whatever reason, we skip it in this snapshot.
2322 
2323  // We don't even need partitioning information to figure this out. We just check the
2324  // new pointers in the root page and find a volatile pointer that equals or contains the range.
2325  struct InstallationInfo {
2326  KeySlice low_;
2327  KeySlice high_;
2328  SnapshotPagePointer pointer_;
2329  };
2330  std::vector<InstallationInfo> infos; // only one root page, so vector wouldn't hurt
2331  infos.reserve(kMaxIntermediatePointers);
2332  for (MasstreeIntermediatePointerIterator it(root_); it.is_valid(); it.next()) {
2333  SnapshotPagePointer pointer = it.get_pointer().snapshot_pointer_;
2334  if (pointer == 0) {
2335  continue;
2336  }
2337 
2339  ASSERT_ND(snapshot_id != snapshot::kNullSnapshotId);
2340  if (snapshot_id == snapshot_id_) {
2341  ASSERT_ND(extract_numa_node_from_snapshot_pointer(pointer) == numa_node_);
2342  }
2343  InstallationInfo info = {it.get_low_key(), it.get_high_key(), pointer};
2344  infos.emplace_back(info);
2345  }
2346 
2347  // vector, same above. in the recursed functions, we use stack array to avoid frequent heap alloc.
2348  std::vector<VolatilePagePointer> recursion_targets;
2349  recursion_targets.reserve(kMaxIntermediatePointers);
2350  {
2351  // first, install pointers. this step is quickly and safely done after taking a page lock.
2352  // recursion is done after releasing this lock because it might take long time.
2353  xct::McsOwnerlessLockScope lock_scope(volatile_root->get_lock_address());
2354  uint16_t cur = 0; // upto infos[cur] is the first that might fully contain the range
2355  for (MasstreeIntermediatePointerIterator it(volatile_root); it.is_valid(); it.next()) {
2356  KeySlice low = it.get_low_key();
2357  KeySlice high = it.get_high_key();
2358  // the snapshot's page range is more coarse or same,
2359  // 1) page range exactly matches -> install pointer and recurse
2360  // 2) volatile range is subset of snapshot's -> just recurse
2361  // in either case, we are interested only when
2362  // snapshot's low <= volatile's low && snapshot's high >= volatile's high
2363  while (cur < infos.size() && (infos[cur].low_ > low || infos[cur].high_ < high)) {
2364  ++cur;
2365  }
2366  if (cur == infos.size()) {
2367  break;
2368  }
2369  ASSERT_ND(infos[cur].low_ <= low && infos[cur].high_ >= high);
2370  DualPagePointer& pointer = volatile_root->get_minipage(it.index_).pointers_[it.index_mini_];
2371  if (infos[cur].low_ == low && infos[cur].high_ == high) {
2372  pointer.snapshot_pointer_ = infos[cur].pointer_;
2373  ++cur; // in this case we know next volatile pointer is beyond this snapshot range
2374  }
2375  // will recurse on it.
2376  if (!pointer.volatile_pointer_.is_null()) {
2377  recursion_targets.emplace_back(pointer.volatile_pointer_);
2378  }
2379  }
2380  }
2381 
2382  // The recursion is not transactionally protected. That's fine; even if we skip over
2383  // something, it's just that we can't drop the page this time.
2384  for (VolatilePagePointer pointer : recursion_targets) {
2385  MasstreePage* child = reinterpret_cast<MasstreePage*>(resolver.resolve_offset(pointer));
2386  if (no_next_layer && child->is_border()) {
2387  // unlike install_snapshot_pointers_recurse_intermediate, we check it here.
2388  // because btree-levels might be unbalanced in first layer's root, we have to follow
2389  // the pointer to figure this out.
2390  continue;
2391  }
2392  WRAP_ERROR_CODE(install_snapshot_pointers_recurse(
2393  resolver,
2394  0,
2395  prefixes,
2396  child,
2397  installed_count));
2398  }
2399  watch.stop();
2400  LOG(INFO) << "MasstreeStorage-" << id_ << " installed " << *installed_count << " pointers"
2401  << " in " << watch.elapsed_ms() << "ms";
2402  return kRetOk;
2403 }
2404 
2405 inline ErrorCode MasstreeComposeContext::install_snapshot_pointers_recurse(
2406  const memory::GlobalVolatilePageResolver& resolver,
2407  uint8_t layer,
2408  KeySlice* prefixes,
2409  MasstreePage* volatile_page,
2410  uint64_t* installed_count) const {
2411  if (volatile_page->is_border()) {
2412  return install_snapshot_pointers_recurse_border(
2413  resolver,
2414  layer,
2415  prefixes,
2416  reinterpret_cast<MasstreeBorderPage*>(volatile_page),
2417  installed_count);
2418  } else {
2419  return install_snapshot_pointers_recurse_intermediate(
2420  resolver,
2421  layer,
2422  prefixes,
2423  reinterpret_cast<MasstreeIntermediatePage*>(volatile_page),
2424  installed_count);
2425  }
2426 }
2427 
2428 ErrorCode MasstreeComposeContext::install_snapshot_pointers_recurse_intermediate(
2429  const memory::GlobalVolatilePageResolver& resolver,
2430  uint8_t layer,
2431  KeySlice* prefixes,
2432  MasstreeIntermediatePage* volatile_page,
2433  uint64_t* installed_count) const {
2434  ASSERT_ND(volatile_page->get_layer() == layer);
2435 
2436  // we do not track down to foster-children. most pages are reachable without it anyways.
2437  // we have to recurse to children, but we _might_ know that we can ignore next layers (borders)
2438  const bool no_next_layer = (merge_sort_->get_longest_key_length() <= (layer + 1U) * 8U);
2439  const bool is_child_intermediate = volatile_page->get_btree_level() >= 2U;
2440  const bool follow_children = !no_next_layer || is_child_intermediate;
2441  const uint8_t btree_level = volatile_page->get_btree_level() - 1U;
2442 
2443  // at this point we don't have to worry about partitioning. this subtree is solely ours.
2444  // we just follow every volatile pointer. this might mean a wasted recursion if this snapshot
2445  // modified only a part of the storage. However, the checks in root level have already cut most
2446  // un-touched subtrees. So, it won't be a big issue.
2447 
2448  // 160*8 bytes in each stack frame... hopefully not too much. nowadays default stack size is 2MB.
2449  VolatilePagePointer recursion_targets[kMaxIntermediatePointers];
2450  uint16_t recursion_target_count = 0;
2451 
2452  {
2453  // look for exactly matching page boundaries. we install pointer only in exact match case
2454  // while we recurse on every volatile page.
2455  xct::McsOwnerlessLockScope lock_scope(volatile_page->get_lock_address());
2456  for (MasstreeIntermediatePointerIterator it(volatile_page); it.is_valid(); it.next()) {
2457  KeySlice low = it.get_low_key();
2458  KeySlice high = it.get_high_key();
2459  DualPagePointer& pointer = volatile_page->get_minipage(it.index_).pointers_[it.index_mini_];
2460  SnapshotPagePointer snapshot_pointer
2461  = lookup_page_boundary_info(btree_level, layer, prefixes, low, high);
2462  if (snapshot_pointer != 0) {
2463  pointer.snapshot_pointer_ = snapshot_pointer;
2464  ++(*installed_count);
2465  } else {
2466  DVLOG(3) << "Oops, no matching boundary. low=" << low << ", high=" << high;
2467  }
2468  if (follow_children && !pointer.volatile_pointer_.is_null()) {
2469  ASSERT_ND(recursion_target_count < kMaxIntermediatePointers);
2470  recursion_targets[recursion_target_count] = pointer.volatile_pointer_;
2471  ++recursion_target_count;
2472  }
2473  }
2474  }
2475 
2476  for (uint16_t i = 0; i < recursion_target_count; ++i) {
2477  MasstreePage* child
2478  = reinterpret_cast<MasstreePage*>(resolver.resolve_offset(recursion_targets[i]));
2479  if (no_next_layer && child->is_border()) {
2480  continue;
2481  }
2482  CHECK_ERROR_CODE(install_snapshot_pointers_recurse(
2483  resolver,
2484  layer,
2485  prefixes,
2486  child,
2487  installed_count));
2488  }
2489 
2490  return kErrorCodeOk;
2491 }
2492 
2493 ErrorCode MasstreeComposeContext::install_snapshot_pointers_recurse_border(
2494  const memory::GlobalVolatilePageResolver& resolver,
2495  uint8_t layer,
2496  KeySlice* prefixes,
2497  MasstreeBorderPage* volatile_page,
2498  uint64_t* installed_count) const {
2499  ASSERT_ND(volatile_page->get_layer() == layer);
2500  const bool no_next_layer = (merge_sort_->get_longest_key_length() <= (layer + 1U) * 8U);
2501  ASSERT_ND(!no_next_layer);
2502  const bool no_next_next_layer = (merge_sort_->get_longest_key_length() <= (layer + 2U) * 8U);
2503 
2504  // unlike intermediate pages, we don't need a page lock.
2505  // a record in border page is never physically deleted.
2506  // also once a record becomes next-layer pointer, it never gets reverted to a usual record.
2507  // just make sure we put consume (no need to be acquire) fence in a few places.
2508  SlotIndex key_count = volatile_page->get_key_count();
2510  for (SlotIndex i = 0; i < key_count; ++i) {
2511  if (!volatile_page->does_point_to_layer(i)) {
2512  continue;
2513  }
2515  DualPagePointer* next_pointer = volatile_page->get_next_layer(i);
2516  if (next_pointer->volatile_pointer_.is_null()) {
2517  // this also means that there was no change since the last snapshot.
2518  continue;
2519  }
2520 
2521  MasstreePage* child
2522  = reinterpret_cast<MasstreePage*>(resolver.resolve_offset(next_pointer->volatile_pointer_));
2523  ASSERT_ND(child->get_low_fence() == kInfimumSlice);
2524  ASSERT_ND(child->is_high_fence_supremum());
2525  uint8_t btree_level = child->get_btree_level();
2526  KeySlice slice = volatile_page->get_slice(i);
2527  prefixes[layer] = slice;
2528  SnapshotPagePointer snapshot_pointer
2529  = lookup_page_boundary_info(btree_level, layer + 1U, prefixes, kInfimumSlice, kSupremumSlice);
2530  if (snapshot_pointer != 0) {
2531  // okay, exactly corresponding snapshot page found
2532  next_pointer->snapshot_pointer_ = snapshot_pointer;
2533  ++(*installed_count);
2534  } else {
2535  DVLOG(2) << "Oops, no matching boundary. border next layer";
2536  }
2537 
2538  if (child->is_border() && no_next_next_layer) {
2539  // the root page of next layer is a border page and we don't have that long keys.
2540  continue;
2541  }
2542  CHECK_ERROR_CODE(install_snapshot_pointers_recurse(
2543  resolver,
2544  layer + 1U,
2545  prefixes,
2546  child,
2547  installed_count));
2548  }
2549 
2550  return kErrorCodeOk;
2551 }
2552 
2559  const Composer::DropVolatilesArguments& args) {
2560  Composer::DropResult result(args);
2561  if (storage_.get_masstree_metadata()->keeps_all_volatile_pages()) {
2562  LOG(INFO) << "Keep-all-volatile: Storage-" << storage_.get_name()
2563  << " is configured to keep all volatile pages.";
2564  result.dropped_all_ = false;
2565  return result;
2566  }
2567 
2568  DualPagePointer* root_pointer = &storage_.get_control_block()->root_page_pointer_;
2569  MasstreeIntermediatePage* volatile_page = reinterpret_cast<MasstreeIntermediatePage*>(
2570  resolve_volatile(root_pointer->volatile_pointer_));
2571  if (volatile_page == nullptr) {
2572  LOG(INFO) << "No volatile root page. Probably while restart";
2573  return result;
2574  }
2575 
2576  // Unlike array-storage, each thread decides partititions to follow a bit differently
2577  // because the snapshot pointer might be not installed.
2578  // We check the partitioning assignment.
2579  PartitionerMetadata* metadata = PartitionerMetadata::get_metadata(engine_, storage_id_);
2580  ASSERT_ND(metadata->valid_);
2582  = reinterpret_cast<MasstreePartitionerData*>(metadata->locate_data(engine_));
2583  ASSERT_ND(data->partition_count_ > 0);
2585  uint16_t cur = 0;
2586  for (MasstreeIntermediatePointerIterator it(volatile_page); it.is_valid(); it.next()) {
2587  DualPagePointer* pointer = &volatile_page->get_minipage(it.index_).pointers_[it.index_mini_];
2588  if (!args.partitioned_drop_) {
2589  result.combine(drop_volatiles_recurse(args, pointer));
2590  continue;
2591  }
2592  // the page boundaries as of partitioning might be different from the volatile page's,
2593  // but at least it's more coarse. Each pointer belongs to just one partition.
2594  // same trick as install_pointers.
2595  KeySlice low = it.get_low_key();
2596  KeySlice high = it.get_high_key();
2597  while (true) {
2598  KeySlice partition_high;
2599  if (cur + 1U == data->partition_count_) {
2600  partition_high = kSupremumSlice;
2601  } else {
2602  partition_high = data->low_keys_[cur + 1U];
2603  }
2604  if (high <= partition_high) {
2605  break;
2606  }
2607  ++cur;
2608  }
2609  ASSERT_ND(cur < data->partition_count_);
2610  if (data->partitions_[cur] == args.my_partition_) {
2611  if (data->low_keys_[cur] != low) {
2612  VLOG(0) << "Not exactly matching page boundary."; // but not a big issue.
2613  }
2614  result.combine(drop_volatiles_recurse(args, pointer));
2615  }
2616  }
2617 
2618  // we so far always keep the volatile root of a masstree storage.
2619  return result;
2620 }
2621 
2623  if (storage_.get_masstree_metadata()->keeps_all_volatile_pages()) {
2624  LOG(INFO) << "Oh, but keep-all-volatile is on. Storage-" << storage_.get_name()
2625  << " is configured to keep all volatile pages.";
2626  return;
2627  }
2628  DualPagePointer* root_pointer = &storage_.get_control_block()->root_page_pointer_;
2629  MasstreeIntermediatePage* volatile_page = reinterpret_cast<MasstreeIntermediatePage*>(
2630  resolve_volatile(root_pointer->volatile_pointer_));
2631  if (volatile_page == nullptr) {
2632  LOG(INFO) << "Oh, but root volatile page already null";
2633  return;
2634  }
2635 
2636  if (is_to_keep_volatile(0, volatile_page->get_btree_level())) {
2637  LOG(INFO) << "Oh, but " << storage_ << " is configured to keep the root page.";
2638  return;
2639  }
2640 
2641  // yes, we can drop ALL volatile pages!
2642  LOG(INFO) << "Okay, drop em all!!";
2643  drop_all_recurse(args, root_pointer);
2644  root_pointer->volatile_pointer_.clear();
2645 }
2646 
2647 void MasstreeComposer::drop_all_recurse(
2649  DualPagePointer* pointer) {
2650  MasstreePage* page = resolve_volatile(pointer->volatile_pointer_);
2651  if (page == nullptr) {
2652  return;
2653  }
2654  drop_all_recurse_page_only(args, page);
2655  args.drop(engine_, pointer->volatile_pointer_);
2656  pointer->volatile_pointer_.clear(); // not required, but to be clear.
2657 }
2658 
2659 void MasstreeComposer::drop_all_recurse_page_only(
2660  const Composer::DropVolatilesArguments& args,
2661  MasstreePage* page) {
2662  ASSERT_ND(page);
2663  if (page->has_foster_child()) {
2664  MasstreePage* minor = resolve_volatile(page->get_foster_minor());
2665  MasstreePage* major = resolve_volatile(page->get_foster_major());
2666  drop_all_recurse_page_only(args, minor);
2667  drop_all_recurse_page_only(args, major);
2668  return;
2669  }
2670 
2671  ASSERT_ND(!page->has_foster_child());
2672  if (page->is_border()) {
2673  MasstreeBorderPage* border = as_border(page);
2674  const SlotIndex key_count = page->get_key_count();
2675  for (SlotIndex i = 0; i < key_count; ++i) {
2676  if (border->does_point_to_layer(i)) {
2677  DualPagePointer* pointer = border->get_next_layer(i);
2678  drop_all_recurse(args, pointer);
2679  }
2680  }
2681  } else {
2682  MasstreeIntermediatePage* casted = as_intermediate(page);
2683  for (MasstreeIntermediatePointerIterator it(casted); it.is_valid(); it.next()) {
2684  DualPagePointer* pointer = const_cast<DualPagePointer*>(&it.get_pointer());
2685  drop_all_recurse(args, pointer);
2686  }
2687  }
2688 }
2689 
2690 inline MasstreePage* MasstreeComposer::resolve_volatile(VolatilePagePointer pointer) {
2691  if (pointer.is_null()) {
2692  return nullptr;
2693  }
2694  const memory::GlobalVolatilePageResolver& page_resolver
2696  return reinterpret_cast<MasstreePage*>(page_resolver.resolve_offset(pointer));
2697 }
2698 
2699 inline Composer::DropResult MasstreeComposer::drop_volatiles_recurse(
2700  const Composer::DropVolatilesArguments& args,
2701  DualPagePointer* pointer) {
2702  Composer::DropResult result(args);
2703  if (pointer->volatile_pointer_.is_null()) {
2704  return result;
2705  }
2706  // Unlike array-storage, even if this pointer itself has not updated the snapshot pointer,
2707  // we might have updated the descendants because of not-matching page boundaries.
2708  // so, anyway we recurse.
2709  MasstreePage* page = resolve_volatile(pointer->volatile_pointer_);
2710 
2711  // Also, masstree might have foster children.
2712  // they are kept as far as this page is kept.
2713  if (page->has_foster_child()) {
2714  MasstreePage* minor = resolve_volatile(page->get_foster_minor());
2715  MasstreePage* major = resolve_volatile(page->get_foster_major());
2716  if (page->is_border()) {
2717  result.combine(drop_volatiles_border(args, as_border(minor)));
2718  result.combine(drop_volatiles_border(args, as_border(major)));
2719  } else {
2720  result.combine(drop_volatiles_intermediate(args, as_intermediate(minor)));
2721  result.combine(drop_volatiles_intermediate(args, as_intermediate(major)));
2722  }
2723  ASSERT_ND(result.max_observed_ >= args.snapshot_.valid_until_epoch_);
2724  } else {
2725  if (page->is_border()) {
2726  result.combine(drop_volatiles_border(args, as_border(page)));
2727  } else {
2728  result.combine(drop_volatiles_intermediate(args, as_intermediate(page)));
2729  }
2730  }
2731  if (result.max_observed_ > args.snapshot_.valid_until_epoch_ || !result.dropped_all_) {
2732  DVLOG(1) << "Couldn't drop a volatile page that has a recent modification";
2733  result.dropped_all_ = false;
2734  return result;
2735  }
2736  ASSERT_ND(result.max_observed_ == args.snapshot_.valid_until_epoch_);
2737  ASSERT_ND(result.dropped_all_);
2738  bool updated_pointer = is_updated_pointer(args, pointer->snapshot_pointer_);
2739  if (updated_pointer) {
2740  if (is_to_keep_volatile(page->get_layer(), page->get_btree_level())) {
2741  DVLOG(2) << "Exempted";
2742  result.dropped_all_ = false; // max_observed is satisfactory, but we chose to not drop.
2743  } else {
2744  if (page->has_foster_child()) {
2745  drop_foster_twins(args, page);
2746  }
2747  args.drop(engine_, pointer->volatile_pointer_);
2748  pointer->volatile_pointer_.clear();
2749  }
2750  } else {
2751  DVLOG(1) << "Couldn't drop a volatile page that has a non-matching boundary";
2752  result.dropped_all_ = false;
2753  }
2754  return result;
2755 }
2756 
2757 void MasstreeComposer::drop_foster_twins(
2758  const Composer::DropVolatilesArguments& args,
2759  MasstreePage* page) {
2760  ASSERT_ND(page->has_foster_child());
2761  MasstreePage* minor = resolve_volatile(page->get_foster_minor());
2762  MasstreePage* major = resolve_volatile(page->get_foster_major());
2763  if (minor->has_foster_child()) {
2764  drop_foster_twins(args, minor);
2765  }
2766  if (major->has_foster_child()) {
2767  drop_foster_twins(args, major);
2768  }
2769  args.drop(engine_, page->get_foster_minor());
2770  args.drop(engine_, page->get_foster_major());
2771 }
2772 
2773 Composer::DropResult MasstreeComposer::drop_volatiles_intermediate(
2774  const Composer::DropVolatilesArguments& args,
2775  MasstreeIntermediatePage* page) {
2776  ASSERT_ND(!page->header().snapshot_);
2777  ASSERT_ND(!page->is_border());
2778 
2779  Composer::DropResult result(args);
2780  if (page->has_foster_child()) {
2781  // iff both minor and major foster children dropped all descendants,
2782  // we drop this page and its foster children altogether.
2783  // otherwise, we keep all of them.
2784  MasstreeIntermediatePage* minor = as_intermediate(resolve_volatile(page->get_foster_minor()));
2785  MasstreeIntermediatePage* major = as_intermediate(resolve_volatile(page->get_foster_major()));
2786  result.combine(drop_volatiles_intermediate(args, minor));
2787  result.combine(drop_volatiles_intermediate(args, major));
2788  return result;
2789  }
2790 
2791  // Explore/replace children first because we need to know if there is new modification.
2792  // In that case, we must keep this volatile page, too.
2793  ASSERT_ND(!page->has_foster_child());
2794  for (MasstreeIntermediatePointerIterator it(page); it.is_valid(); it.next()) {
2795  DualPagePointer* pointer = &page->get_minipage(it.index_).pointers_[it.index_mini_];
2796  result.combine(drop_volatiles_recurse(args, pointer));
2797  }
2798 
2799  return result;
2800 }
2801 
2802 inline Composer::DropResult MasstreeComposer::drop_volatiles_border(
2803  const Composer::DropVolatilesArguments& args,
2804  MasstreeBorderPage* page) {
2805  ASSERT_ND(!page->header().snapshot_);
2806  ASSERT_ND(page->is_border());
2807 
2808  Composer::DropResult result(args);
2809  if (page->has_foster_child()) {
2810  MasstreeBorderPage* minor = as_border(resolve_volatile(page->get_foster_minor()));
2811  MasstreeBorderPage* major = as_border(resolve_volatile(page->get_foster_major()));
2812  result.combine(drop_volatiles_border(args, minor));
2813  result.combine(drop_volatiles_border(args, major));
2814  return result;
2815  }
2816 
2817  ASSERT_ND(!page->has_foster_child());
2818  const SlotIndex key_count = page->get_key_count();
2819  for (SlotIndex i = 0; i < key_count; ++i) {
2820  if (page->does_point_to_layer(i)) {
2821  DualPagePointer* pointer = page->get_next_layer(i);
2822  result.combine(drop_volatiles_recurse(args, pointer));
2823  } else {
2824  Epoch epoch = page->get_owner_id(i)->xct_id_.get_epoch();
2825  ASSERT_ND(epoch.is_valid());
2826  result.on_rec_observed(epoch);
2827  }
2828  }
2829  return result;
2830 }
2831 inline bool MasstreeComposer::is_updated_pointer(
2832  const Composer::DropVolatilesArguments& args,
2833  SnapshotPagePointer pointer) const {
2835  return (pointer != 0 && args.snapshot_.id_ == snapshot_id);
2836 }
2837 inline bool MasstreeComposer::is_to_keep_volatile(uint8_t layer, uint16_t btree_level) const {
2838  const MasstreeMetadata* meta = storage_.get_masstree_metadata();
2839  // snapshot_drop_volatile_pages_layer_threshold_:
2840  // Number of B-trie layers of volatile pages to keep after each snapshotting.
2841  // Ex. 0 drops all (always false).
2842  if (layer < meta->snapshot_drop_volatile_pages_layer_threshold_) {
2843  return true;
2844  }
2845  // snapshot_drop_volatile_pages_btree_levels_:
2846  // Volatile pages of this B-tree level or higher are always kept after each snapshotting.
2847  // Ex. 0 keeps all. 0xFF drops all.
2848  return (btree_level >= meta->snapshot_drop_volatile_pages_btree_levels_);
2849 }
2850 
2851 std::ostream& operator<<(std::ostream& o, const MasstreeComposeContext::PathLevel& v) {
2852  o << "<PathLevel>"
2853  << "<layer_>" << static_cast<int>(v.layer_) << "</layer_>"
2854  << "<next_original_>" << static_cast<int>(v.next_original_) << "</next_original_>"
2855  << "<next_original_mini>" << static_cast<int>(v.next_original_mini_) << "</next_original_mini>"
2856  << "<head>" << v.head_ << "</head>"
2857  << "<tail_>" << v.tail_ << "</tail_>"
2858  << "<page_count_>" << v.page_count_ << "</page_count_>"
2859  << "<low_fence_>" << assorted::Hex(v.low_fence_, 16) << "</low_fence_>"
2860  << "<high_fence_>" << assorted::Hex(v.high_fence_, 16) << "</high_fence_>"
2861  << "<next_original_slice_>"
2862  << assorted::Hex(v.next_original_slice_, 16) << "</next_original_slice_>"
2863  << "</PathLevel>";
2864  return o;
2865 }
2866 
2867 } // namespace masstree
2868 } // namespace storage
2869 } // namespace foedus
const Page *const * root_info_pages_
Root info pages output by compose()
Definition: composer.hpp:130
bool valid_
Whether this partitioner information (metadata+data) has been constructed.
LogCode
A unique identifier of all log types.
Definition: log_type.hpp:87
const KeySlice kInfimumSlice
0x0033 : foedus::storage::masstree::MasstreeInsertLogType .
Definition: log_type.hpp:127
SlotIndex get_key_count() const __attribute__((always_inline))
physical key count (those keys might be deleted) in this page.
T align8(T value)
8-alignment.
uint64_t SnapshotLocalPageId
Represents a local page ID in each one snapshot file in some NUMA node.
Definition: storage_id.hpp:89
const uint64_t kSliceLen
Shorthand for sizeof(KeySlice).
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:378
uint16_t PayloadLength
Represents a byte-length of a payload in this package.
Definition: masstree_id.hpp:92
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
Definition: composer.hpp:110
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:91
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
uint16_t SlotIndex
Index of a record in a (border) page.
void fill_payload_padded(char *destination, const char *source, PayloadLength count)
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, uint8_t layer, KeySlice low_fence, KeySlice high_fence)
MasstreeComposeContext(Engine *engine, snapshot::MergeSort *merge_sort, const Composer::ComposeArguments &args)
MasstreeComposeContext methods.
std::ostream & operator<<(std::ostream &o, const MasstreeComposeContext::PathLevel &v)
static uint32_t calculate_hash(uint8_t btree_level, uint8_t layer, const KeySlice *prefixes, KeySlice low_fence, KeySlice high_fence) __attribute__((always_inline))
uint8_t page_type_
Actually of PageType.
Definition: page.hpp:205
void change_log_type_at(uint32_t sort_pos, log::LogCode before, log::LogCode after)
used to switch between two compatible log types.
Definition: merge_sort.hpp:393
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Definition: merge_sort.cpp:273
const KeyLength kMaxKeyLength
Max length of a key.
Definition: masstree_id.hpp:75
Represents a logic to compose a new version of data pages for one storage.
Definition: composer.hpp:86
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
bool is_border() const __attribute__((always_inline))
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
SnapshotPagePointer to_snapshot_page_pointer(uint16_t snapshot_id, uint8_t node, SnapshotLocalPageId local_page_id)
Definition: storage_id.hpp:106
Declares all log types used in this storage type.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
Definition: stop_watch.hpp:48
uint16_t DataOffset
Byte-offset in a page.
void drop_root_volatile(const Composer::DropVolatilesArguments &args)
bool keeps_all_volatile_pages() const
Definition: metadata.hpp:98
MiniPage & get_minipage(uint8_t index) __attribute__((always_inline))
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Represents one border page in Masstree Storage.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
log::LogCode get_log_type_from_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:374
ErrorStack execute()
MasstreeComposeContext::execute() and subroutines for log groups.
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Definition: composer.hpp:99
uint64_t KeySlice
Each key slice is an 8-byte integer.
bool are_all_single_layer_logs() const __attribute__((always_inline))
Definition: merge_sort.hpp:411
const log::RecordLogType * resolve_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:384
ErrorStack construct_root(const Composer::ConstructRootArguments &args)
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
Definitions of IDs in this package and a few related constant values.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
uint16_t KeyLength
Represents a byte-length of a key in this package.
Definition: masstree_id.hpp:69
Maximum number of logs execute_xxx_group methods process in one shot.
#define ASSUME_ALIGNED(x, y)
Wraps GCC's __builtin_assume_aligned.
Definition: compiler.hpp:111
Common base of MasstreeIntermediatePage and MasstreeBorderPage.
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
uint32_t log_streams_count_
Number of sorted runs.
Definition: composer.hpp:103
log::LogCode get_log_type() const __attribute__((always_inline))
Definition: merge_sort.hpp:292
MasstreeComposer's compose() implementation separated from the class itself.
Composer::DropResult drop_volatiles(const Composer::DropVolatilesArguments &args)
drop_volatiles and related methods
MasstreeComposer(Composer *parent)
MasstreeComposer methods.
KeySlice get_separator(uint8_t index) const __attribute__((always_inline))
0x0032 : foedus::storage::masstree::MasstreeOverwriteLogType .
Definition: log_type.hpp:126
DualPagePointer pointers_[kMaxIntermediateMiniSeparators+1]
void append_pointer_snapshot(KeySlice low_fence, SnapshotPagePointer pointer)
Appends a new poiner and separator in an existing mini page, used only by snapshot composer...
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
0 means no-error.
Definition: error_code.hpp:87
uint8_t masstree_in_layer_level_
used only in masstree.
Definition: page.hpp:243
static bool static_less_than(const PageBoundarySort &lhs, const PageBoundarySort &rhs) __attribute__((always_inline))
used by std::lower_bound
uint16_t key_count_
physical key count (those keys might be deleted) in this page.
Definition: page.hpp:219
const StorageName & get_name() const
Returns the unique name of this storage.
Definition: storage.hpp:155
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
Definition: merge_sort.hpp:77
bool exists() const
Returns whether this storage is already created.
Definition: storage.hpp:169
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
ErrorCode peek_volatile_page_boundaries(Engine *engine, const PeekBoundariesArguments &args)
Checks the volatile pages of this storage to give hints to decide page boundary keys.
const KeyLength kInitiallyNextLayer
A special key length value that denotes the record in a border page was initially a next-layer pointe...
Definition: masstree_id.hpp:86
MergedPosition get_current_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:369
PageType get_page_type() const
Definition: page.hpp:280
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint8_t get_btree_level() const __attribute__((always_inline))
used only in masstree.
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
Epoch get_base_epoch() const __attribute__((always_inline))
Definition: merge_sort.hpp:368
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
const uint32_t kBorderPageDataPartSize
Byte size of the record data part (data_) in MasstreeBorderPage.
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
storage::Page * get_page_base() __attribute__((always_inline))
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
Retrun value of drop_volatiles()
Definition: composer.hpp:171
KeySlice get_high_fence() const __attribute__((always_inline))
SnapshotPagePointer * new_root_page_pointer_
[OUT] Returns pointer to new root snapshot page/
Definition: composer.hpp:136
bool dropped_all_
Whether all volatile pages under the page was dropped.
Definition: composer.hpp:202
MasstreeBorderPage * as_border(MasstreePage *page)
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
uint16_t get_longest_key_length() const __attribute__((always_inline))
Definition: merge_sort.hpp:412
uint16_t my_partition_
if partitioned_drop_ is true, the partition this thread should drop volatile pages from ...
Definition: composer.hpp:152
bool from_this_snapshot(SnapshotPagePointer pointer, const Composer::ConstructRootArguments &args)
SlotIndex next_original_
If level is based on an existing snapshot page, the next entry (pointer or record) in the original pa...
KeySlice get_low_fence() const __attribute__((always_inline))
MasstreeIntermediatePage * as_intermediate(MasstreePage *page)
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
bool is_valid() const
Definition: epoch.hpp:96
storage::Page * get_intermediate_base() __attribute__((always_inline))
ErrorStack compose(const Composer::ComposeArguments &args)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
0x0034 : foedus::storage::masstree::MasstreeDeleteLogType .
Definition: log_type.hpp:128
Tiny metadata of partitioner for every storage used while log gleaning.
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
Definition: composer.hpp:126
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
void * locate_data(Engine *engine)
Returns the partitioner data pointed from this metadata.
Definition: partitioner.cpp:51
0x000C : "GENERAL: Other uncategorized errors." .
Definition: error_code.hpp:116
snapshot::BufferPosition info_pos_
Points to PageBoundaryInfo in page_boundary_info_.
We assume B-trie depth is at most this (prefix is at most 8*this value)
const ErrorStack kRetOk
Normal return value for no-error case.
0x0035 : foedus::storage::masstree::MasstreeUpdateLogType .
Definition: log_type.hpp:129
bool partitioned_drop_
if true, one thread for each partition will invoke drop_volatiles()
Definition: composer.hpp:154
bool is_key_aligned_and_zero_padded(const char *key, KeyLength key_length)
Returns if the given key is 8-bytes aligned and also zero-padded to 8-bytes for easier slicing (which...
Convenient way of writing hex integers to stream.
const uint32_t kMaxIntermediatePointers
Max number of pointers (if completely filled) stored in an intermediate pages.
Represents a group of consecutive logs in the current batch.
Definition: merge_sort.hpp:287
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
const KeySlice kSupremumSlice
#define UNUSED_ND(var)
Cross-compiler UNUSED macro for the same purpose as ASSERT_ND(x).
Definition: assert_nd.hpp:79
Represents one intermediate page in Masstree Storage.
uint16_t count_common_slices(const void *left, const void *right, uint16_t max_slices)
Returns the number of 8-byte slices that the two strings share as prefix.
uint32_t root_info_pages_count_
Number of root info pages.
Definition: composer.hpp:132
void combine(const DropResult &other)
Definition: composer.hpp:176
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
uint32_t page_count_
number of pages in this level including head/tail, without original page (so, initial=1).
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
snapshot::SortedBuffer *const * log_streams_
Sorted runs.
Definition: composer.hpp:101
CONTROL_BLOCK * get_control_block() const
Definition: attachable.hpp:97
void drop(Engine *engine, VolatilePagePointer pointer) const
Returns (might cache) the given pointer to volatile pool.
Definition: composer.cpp:114
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Definition: composer.hpp:105
storage::SnapshotPagePointer get_next_page_id() const
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
uint8_t find_minipage(KeySlice slice) const __attribute__((always_inline))
Navigates a searching key-slice to one of the mini pages in this page.
void extract_separators_snapshot(uint8_t index, uint8_t index_mini, KeySlice *separator_low, KeySlice *separator_high) const
Retrieves separators defining the index, used only by snapshot composer, thus no race.
GroupifyResult groupify(uint32_t begin, uint32_t limit=0) const __attribute__((always_inline))
Find a group of consecutive logs from the given position that have either a common log type or a comm...
Definition: merge_sort.hpp:534
KeySlice normalize_be_bytes_full_aligned(const void *be_bytes)
Convert an aligned big-endian byte array of at least 8-bytes-length to KeySlice.
uint8_t find_pointer(KeySlice slice) const __attribute__((always_inline))
Navigates a searching key-slice to one of pointers in this mini-page.
const MasstreeMetadata * get_masstree_metadata() const
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, uint8_t layer, uint8_t level, KeySlice low_fence, KeySlice high_fence)
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
static PartitionerMetadata * get_metadata(Engine *engine, StorageId id)
Returns the shared memory for the given storage ID.
Definition: partitioner.cpp:38