libfoedus-core
FOEDUS Core Library
masstree_storage_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <string>
23 
24 #include "foedus/engine.hpp"
26 #include "foedus/log/log_type.hpp"
32 #include "foedus/storage/page.hpp"
46 #include "foedus/thread/thread.hpp"
47 #include "foedus/xct/xct.hpp"
48 
49 namespace foedus {
50 namespace storage {
51 namespace masstree {
52 
59  thread::Thread* context,
60  bool for_write,
61  MasstreeIntermediatePage** root) {
63  MasstreeIntermediatePage* page = nullptr;
65  nullptr,
66  false,
67  for_write,
68  true,
69  root_pointer,
70  reinterpret_cast<Page**>(&page),
71  nullptr,
72  0));
73 
74  assert_aligned_page(page);
75  ASSERT_ND(page->get_layer() == 0);
78  ASSERT_ND(!for_write || !page->header().snapshot_);
79 
80  if (UNLIKELY(page->has_foster_child())) {
81  ASSERT_ND(!page->header().snapshot_);
82  // root page has a foster child... time for tree growth!
83  // either case, we just follow the moved page. Master-Tree invariant guarantees it's safe.
84  if (control_block_->first_root_locked_) {
85  // To avoid contention, we begin it only when the lock seems uncontended.
86  DVLOG(0) << "Other thread seems growing the first-layer. let him do that";
87  } else {
88  GrowFirstLayerRoot functor(context, get_id());
89  CHECK_ERROR_CODE(context->run_nested_sysxct(&functor, 2));
90  }
91  }
92 
93  ASSERT_ND(page->get_layer() == 0);
96  *root = page;
97  return kErrorCodeOk;
98 }
99 
106  LOG(INFO) << "Uninitializing a masstree-storage " << get_name();
107 
108  if (!control_block_->root_page_pointer_.volatile_pointer_.is_null()) {
109  // release volatile pages
110  const memory::GlobalVolatilePageResolver& page_resolver
112  // first root is guaranteed to be an intermediate page
113  MasstreeIntermediatePage* first_root = reinterpret_cast<MasstreeIntermediatePage*>(
114  page_resolver.resolve_offset(control_block_->root_page_pointer_.volatile_pointer_));
116  control_block_->root_page_pointer_.volatile_pointer_.word = 0;
117  }
118 
119  return kRetOk;
120 }
121 
123  control_block_->root_page_pointer_.snapshot_pointer_ = 0;
124  control_block_->root_page_pointer_.volatile_pointer_.word = 0;
125  control_block_->meta_.root_snapshot_page_id_ = 0;
126 
127  const uint16_t kDummyNode = 0; // whatever. just pick from the first node
128  memory::PagePool* pool
130  const memory::LocalPageResolver &local_resolver = pool->get_resolver();
131 
132  // The root of first layer is always an intermediate page.
133  // This is a special rule only for first layer to simplify partitioning and composer.
134  memory::PagePoolOffset root_offset;
135  WRAP_ERROR_CODE(pool->grab_one(&root_offset));
136  ASSERT_ND(root_offset);
137  MasstreeIntermediatePage* root_page = reinterpret_cast<MasstreeIntermediatePage*>(
138  local_resolver.resolve_offset_newpage(root_offset));
139  control_block_->first_root_locked_ = false;
140  control_block_->root_page_pointer_.snapshot_pointer_ = 0;
141  control_block_->root_page_pointer_.volatile_pointer_.set(kDummyNode, root_offset);
142  root_page->initialize_volatile_page(
143  get_id(),
144  control_block_->root_page_pointer_.volatile_pointer_,
145  0,
146  1,
149 
150  // Also allocate the only child.
151  memory::PagePoolOffset child_offset;
152  WRAP_ERROR_CODE(pool->grab_one(&child_offset));
153  ASSERT_ND(child_offset);
154  MasstreeBorderPage* child_page = reinterpret_cast<MasstreeBorderPage*>(
155  local_resolver.resolve_offset_newpage(child_offset));
156  VolatilePagePointer child_pointer;
157  child_pointer.set(kDummyNode, child_offset);
158  child_page->initialize_volatile_page(get_id(), child_pointer, 0, kInfimumSlice, kSupremumSlice);
159  root_page->get_minipage(0).pointers_[0].snapshot_pointer_ = 0;
160  root_page->get_minipage(0).pointers_[0].volatile_pointer_ = child_pointer;
161  return kRetOk;
162 }
163 
165  if (exists()) {
166  LOG(ERROR) << "This masstree-storage already exists: " << get_name();
168  }
169 
170  control_block_->meta_ = metadata;
172  control_block_->status_ = kExists;
173  LOG(INFO) << "Newly created an masstree-storage " << get_name();
174  return kRetOk;
175 }
177  control_block_->meta_ = static_cast<const MasstreeMetadata&>(snapshot_block.meta_);
178  const MasstreeMetadata& meta = control_block_->meta_;
179  control_block_->root_page_pointer_.snapshot_pointer_ = meta.root_snapshot_page_id_;
180  control_block_->root_page_pointer_.volatile_pointer_.word = 0;
181  control_block_->first_root_locked_ = false;
182 
183  // So far we assume the root page always has a volatile version.
184  // Create it now.
185  if (meta.root_snapshot_page_id_ != 0) {
187  CHECK_ERROR(fileset.initialize());
189  VolatilePagePointer volatile_pointer;
190  MasstreeIntermediatePage* volatile_root;
192  &fileset,
194  &volatile_pointer,
195  reinterpret_cast<Page**>(&volatile_root)));
196  CHECK_ERROR(fileset.uninitialize());
197  control_block_->root_page_pointer_.volatile_pointer_ = volatile_pointer;
198  } else {
199  LOG(INFO) << "This is an empty masstree: " << get_meta();
201  }
202 
203  control_block_->status_ = kExists;
204  LOG(INFO) << "Loaded a masstree-storage " << get_meta();
205  return kRetOk;
206 }
207 
214  thread::Thread* context,
215  MasstreePage* layer_root,
216  uint8_t current_layer,
217  bool for_writes,
218  KeySlice slice,
219  MasstreeBorderPage** border) {
220  assert_aligned_page(layer_root);
221  ASSERT_ND(layer_root->is_high_fence_supremum());
222  ASSERT_ND(layer_root->get_low_fence() == kInfimumSlice);
223  MasstreePage* cur = layer_root;
224  cur->prefetch_general();
225  while (true) {
226  assert_aligned_page(cur);
227  ASSERT_ND(cur->get_layer() == current_layer);
228  ASSERT_ND(cur->within_fences(slice));
229  if (cur->is_border()) {
230  // We follow foster-twins only in border pages.
231  // In intermediate pages, Master-Tree invariant tells us that we don't have to.
232  // Furthermore, if we do, we need to handle the case of empty-range intermediate pages.
233  // Rather we just do this only in border pages.
234  if (UNLIKELY(cur->has_foster_child())) {
235  // follow one of foster-twin.
236  if (cur->within_foster_minor(slice)) {
237  cur = reinterpret_cast<MasstreePage*>(context->resolve(cur->get_foster_minor()));
238  } else {
239  cur = reinterpret_cast<MasstreePage*>(context->resolve(cur->get_foster_major()));
240  }
241  ASSERT_ND(cur->within_fences(slice));
242  continue;
243  }
244  *border = reinterpret_cast<MasstreeBorderPage*>(cur);
245  return kErrorCodeOk;
246  } else {
247  MasstreeIntermediatePage* page = reinterpret_cast<MasstreeIntermediatePage*>(cur);
248  uint8_t minipage_index = page->find_minipage(slice);
249  MasstreeIntermediatePage::MiniPage& minipage = page->get_minipage(minipage_index);
250 
251  minipage.prefetch();
252  uint8_t pointer_index = minipage.find_pointer(slice);
253  DualPagePointer& pointer = minipage.pointers_[pointer_index];
254  MasstreePage* next;
255  CHECK_ERROR_CODE(follow_page(context, for_writes, &pointer, &next));
256  next->prefetch_general();
257  if (LIKELY(next->within_fences(slice))) {
258  if (next->has_foster_child() && !cur->is_moved()) {
259  // oh, the page has foster child, so we should adopt it.
260  // Whether Adopt actually adopted it or not,
261  // we follow the "old" next page. Master-Tree invariant guarantees that it's safe.
262  // This is beneficial when we lazily give up adoption in the method, eg other threads
263  // holding locks in the intermediate page.
264  if (!next->is_locked() && !cur->is_locked()) {
265  // Let's try adopting. No need to try many times. Adopt can be delayed
266  Adopt functor(context, page, next);
267  CHECK_ERROR_CODE(context->run_nested_sysxct(&functor, 2));
268  } else {
269  // We don't have to adopt it right away. Do it when it's not contended
270  DVLOG(1) << "Someone else seems doing something there.. already adopting? skip it";
271  }
272  }
273  cur = next;
274  } else {
275  // even in this case, local retry suffices thanks to foster-twin
276  DVLOG(0) << "Interesting. concurrent thread affected the search. local retry";
278  }
279  }
280  }
281 }
282 
284  thread::Thread* context,
285  const void* key,
286  KeyLength key_length,
287  bool for_writes,
288  RecordLocation* result) {
289  ASSERT_ND(result);
290  ASSERT_ND(key_length <= kMaxKeyLength);
291  result->clear();
292  xct::Xct* cur_xct = &context->get_current_xct();
293  MasstreePage* layer_root;
295  context,
296  for_writes,
297  reinterpret_cast<MasstreeIntermediatePage**>(&layer_root)));
298  for (uint16_t current_layer = 0;; ++current_layer) {
299  KeyLength remainder_length = key_length - current_layer * 8;
300  KeySlice slice = slice_layer(key, key_length, current_layer);
301  const void* suffix = reinterpret_cast<const char*>(key) + (current_layer + 1) * 8;
302  MasstreeBorderPage* border;
304  context,
305  layer_root,
306  current_layer,
307  for_writes,
308  slice,
309  &border));
310  PageVersionStatus border_version = border->get_version().status_;
312  SlotIndex index = border->find_key(slice, suffix, remainder_length);
313 
314  if (index == kBorderPageMaxSlots) {
315  // this means not found. add it to page version set to protect the lack of record
316  if (!border->header().snapshot_) {
318  border->get_version_address(),
319  border_version));
320  }
321  // So far we just use page-version set to protect this non-existence.
322  // TASK(Hideaki) range lock rather than page-set to improve concurrency?
323  result->clear();
325  }
326  if (border->does_point_to_layer(index)) {
327  CHECK_ERROR_CODE(follow_layer(context, for_writes, border, index, &layer_root));
328  continue;
329  } else {
330  CHECK_ERROR_CODE(result->populate_logical(cur_xct, border, index, for_writes));
331  return kErrorCodeOk;
332  }
333  }
334 }
335 
337  thread::Thread* context,
338  KeySlice key,
339  bool for_writes,
340  RecordLocation* result) {
341  ASSERT_ND(result);
342  result->clear();
343  xct::Xct* cur_xct = &context->get_current_xct();
344  MasstreeBorderPage* border;
345  MasstreeIntermediatePage* layer_root;
346  CHECK_ERROR_CODE(get_first_root(context, for_writes, &layer_root));
347  CHECK_ERROR_CODE(find_border_physical(context, layer_root, 0, for_writes, key, &border));
348  SlotIndex index = border->find_key_normalized(0, border->get_key_count(), key);
349  PageVersionStatus border_version = border->get_version().status_;
350  if (index == kBorderPageMaxSlots) {
351  // this means not found
352  if (!border->header().snapshot_) {
354  border->get_version_address(),
355  border_version));
356  }
357  // TASK(Hideaki) range lock
358  result->clear();
360  }
361  // because this is just one slice, we never go to second layer
362  ASSERT_ND(!border->does_point_to_layer(index));
363  CHECK_ERROR_CODE(result->populate_logical(cur_xct, border, index, for_writes));
364  return kErrorCodeOk;
365 }
366 
368  thread::Thread* context,
369  bool for_writes,
370  storage::DualPagePointer* pointer,
371  MasstreePage** page) {
372  ASSERT_ND(!pointer->is_both_null());
373  return context->follow_page_pointer(
374  nullptr, // masstree doesn't create a new page except splits.
375  false, // so, there is no null page possible
376  for_writes, // always get volatile pages for writes
377  true,
378  pointer,
379  reinterpret_cast<Page**>(page),
380  nullptr, // only used for new page creation, so nothing to pass
381  -1); // same as above
382 }
383 
385  thread::Thread* context,
386  bool for_writes,
387  MasstreeBorderPage* parent,
388  SlotIndex record_index,
389  MasstreePage** page) {
390  ASSERT_ND(record_index < kBorderPageMaxSlots);
391  ASSERT_ND(parent->does_point_to_layer(record_index));
392  DualPagePointer* pointer = parent->get_next_layer(record_index);
393  xct::RwLockableXctId* owner = parent->get_owner_id(record_index);
394  ASSERT_ND(owner->xct_id_.is_next_layer() || owner->xct_id_.is_moved());
395  ASSERT_ND(!pointer->is_both_null());
396  MasstreePage* next_root;
397  CHECK_ERROR_CODE(follow_page(context, for_writes, pointer, &next_root));
398 
399  // root page has a foster child... time for tree growth!
400  if (UNLIKELY(next_root->has_foster_child())) {
401  ASSERT_ND(next_root->get_layer() > 0);
402  // Either case, we follow the old page. Master-Tree invariant guarantees it's safe
403  GrowNonFirstLayerRoot functor(context, parent, record_index);
404  CHECK_ERROR_CODE(context->run_nested_sysxct(&functor, 2U));
405  }
406 
407  ASSERT_ND(next_root);
408  *page = next_root;
409  return kErrorCodeOk;
410 }
411 
413  thread::Thread* context,
414  const void* key,
415  KeyLength key_length,
416  PayloadLength payload_count,
417  PayloadLength physical_payload_hint,
418  RecordLocation* result) {
419  ASSERT_ND(key_length <= kMaxKeyLength);
420  ASSERT_ND(physical_payload_hint >= payload_count);
421  ASSERT_ND(result);
422  result->clear();
423  xct::Xct* cur_xct = &context->get_current_xct();
424 
425  MasstreePage* layer_root;
427  context,
428  true,
429  reinterpret_cast<MasstreeIntermediatePage**>(&layer_root)));
430  for (uint16_t layer = 0;; ++layer) {
431  const KeyLength remainder = key_length - layer * sizeof(KeySlice);
432  const KeySlice slice = slice_layer(key, key_length, layer);
433  const void* const suffix = reinterpret_cast<const char*>(key) + (layer + 1) * sizeof(KeySlice);
434  MasstreeBorderPage* border;
435  CHECK_ERROR_CODE(find_border_physical(context, layer_root, layer, true, slice, &border));
436  while (true) { // retry loop for following foster child and temporary failure
437  // if we found out that the page was split and we should follow foster child, do it.
438  while (border->has_foster_child()) {
439  if (border->within_foster_minor(slice)) {
440  border = context->resolve_cast<MasstreeBorderPage>(border->get_foster_minor());
441  } else {
442  border = context->resolve_cast<MasstreeBorderPage>(border->get_foster_major());
443  }
444  }
445  ASSERT_ND(border->within_fences(slice));
446 
447  const SlotIndex count = border->get_key_count();
448  // ReserveRecords, we need a fence on BOTH sides.
449  // observe key count first, then verify the keys.
452  0,
453  count,
454  slice,
455  suffix,
456  remainder);
457 
460  CHECK_ERROR_CODE(follow_layer(context, true, border, match.index_, &layer_root));
461  break; // next layer
463  // Even in this case, if the record space is too small, we must migrate it first.
464  if (border->get_max_payload_length(match.index_) >= payload_count) {
465  // Ok, this seems to already satisfy our need. but...
466  // Up until now, all the searches/expands were physical-only.
467  // Now we finalize the XID in the search result so that precommit will
468  // catch any change since now. This also means we need to re-check
469  // the status again, and retry if pointing to next layer or moved.
470  CHECK_ERROR_CODE(result->populate_logical(cur_xct, border, match.index_, true));
471  if (result->observed_.is_next_layer() || result->observed_.is_moved()) {
472  // because the search is optimistic, we might now see a XctId with next-layer bit on.
473  // in this case, we retry.
474  VLOG(0) << "Interesting. Next-layer-retry due to concurrent transaction";
476  continue;
477  }
478  return kErrorCodeOk;
479  }
480  }
481 
482  // In all other cases, we (probably) need to do something complex.
483  // ReserveRecords sysxct does it.
484  ReserveRecords reserve(
485  context,
486  border,
487  slice,
488  remainder,
489  suffix,
490  physical_payload_hint, // let's allocate conservatively
491  get_meta().should_aggresively_create_next_layer(layer, remainder),
492  match.match_type_ == MasstreeBorderPage::kNotFound ? count : match.index_);
493  CHECK_ERROR_CODE(context->run_nested_sysxct(&reserve, 2U));
494 
495  // We might need to split the page
496  if (reserve.out_split_needed_) {
497  SplitBorder split(
498  context,
499  border,
500  slice,
501  false,
502  true,
503  remainder,
504  physical_payload_hint,
505  suffix);
506  CHECK_ERROR_CODE(context->run_nested_sysxct(&split, 2U));
507  }
508 
509  // In either case, we should resume the search.
510  continue;
511  }
512  }
513 }
514 
516  thread::Thread* context,
517  KeySlice key,
518  PayloadLength payload_count,
519  PayloadLength physical_payload_hint,
520  RecordLocation* result) {
521  ASSERT_ND(result);
522  result->clear();
523  xct::Xct* cur_xct = &context->get_current_xct();
524  MasstreeBorderPage* border;
525 
526  MasstreeIntermediatePage* layer_root;
527  CHECK_ERROR_CODE(get_first_root(context, true, &layer_root));
528  CHECK_ERROR_CODE(find_border_physical(context, layer_root, 0, true, key, &border));
529  while (true) { // retry loop for following foster child
530  // if we found out that the page was split and we should follow foster child, do it.
531  while (border->has_foster_child()) {
532  if (border->within_foster_minor(key)) {
533  border = context->resolve_cast<MasstreeBorderPage>(border->get_foster_minor());
534  } else {
535  border = context->resolve_cast<MasstreeBorderPage>(border->get_foster_major());
536  }
537  }
538 
539  ASSERT_ND(border->within_fences(key));
540 
541  // because we never go on to second layer in this case, it's either a full match or not-found
542  const SlotIndex count = border->get_key_count();
544  const SlotIndex index = border->find_key_normalized(0, count, key);
545 
546  if (index != kBorderPageMaxSlots) {
547  // If the record space is too small, we can't insert.
548  if (border->get_max_payload_length(index) >= payload_count) {
549  CHECK_ERROR_CODE(result->populate_logical(cur_xct, border, index, true));
550  if (result->observed_.is_moved()) {
551  // because the search is optimistic, we might now see a XctId with next-layer bit on.
552  // in this case, we retry.
553  VLOG(0) << "Interesting. Moved by concurrent transaction";
555  continue;
556  }
557  return kErrorCodeOk;
558  }
559  }
560 
561  // Same as the general case.
562  ReserveRecords reserve(
563  context,
564  border,
565  key,
566  sizeof(KeySlice),
567  nullptr,
568  physical_payload_hint,
569  false,
570  index == kBorderPageMaxSlots ? count : index);
571  CHECK_ERROR_CODE(context->run_nested_sysxct(&reserve, 2U));
572 
573  if (reserve.out_split_needed_) {
574  SplitBorder split(
575  context,
576  border,
577  key,
578  false,
579  true,
580  sizeof(KeySlice),
581  physical_payload_hint,
582  nullptr);
583  CHECK_ERROR_CODE(context->run_nested_sysxct(&split, 2U));
584  }
585  continue;
586  }
587 }
588 
590  if (UNLIKELY(observed.is_next_layer())) {
591  // this should have been checked before this method and resolved as abort or retry,
592  // but if it reaches here for some reason, we treat it as usual contention abort.
593  DLOG(INFO) << "Probably this should be caught beforehand. next_layer bit is on";
594  return kErrorCodeXctRaceAbort;
595  }
596  return kErrorCodeOk;
597 }
598 
600  thread::Thread* /*context*/,
601  const RecordLocation& location,
602  void* payload,
603  PayloadLength* payload_capacity) {
604  if (location.observed_.is_deleted()) {
605  // This result is protected by readset
607  }
609 
610  // here, we do NOT have to do another optimistic-read protocol because we already took
611  // the owner_id into read-set. If this read is corrupted, we will be aware of it at commit time.
612  MasstreeBorderPage* border = location.page_;
613  PayloadLength payload_length = border->get_payload_length(location.index_);
614  if (payload_length > *payload_capacity) {
615  // buffer too small
616  DVLOG(0) << "buffer too small??" << payload_length << ":" << *payload_capacity;
617  *payload_capacity = payload_length;
619  }
620  *payload_capacity = payload_length;
621  std::memcpy(payload, border->get_record_payload(location.index_), payload_length);
622  return kErrorCodeOk;
623 }
624 
626  thread::Thread* /*context*/,
627  const RecordLocation& location,
628  void* payload,
629  PayloadLength payload_offset,
630  PayloadLength payload_count) {
631  if (location.observed_.is_deleted()) {
632  // This result is protected by readset
634  }
636  MasstreeBorderPage* border = location.page_;
637  if (border->get_payload_length(location.index_) < payload_offset + payload_count) {
638  LOG(WARNING) << "short record"; // probably this is a rare error. so warn.
640  }
641  std::memcpy(payload, border->get_record_payload(location.index_) + payload_offset, payload_count);
642  return kErrorCodeOk;
643 }
644 
646  thread::Thread* context,
647  const RecordLocation& location,
648  log::RecordLogType* log_entry) {
649  // If we have taken readset in locate_record, add as a related write set
650  MasstreeBorderPage* border = location.page_;
651  auto* tid = border->get_owner_id(location.index_);
652  char* record = border->get_record(location.index_);
653  xct::Xct* cur_xct = &context->get_current_xct();
654  if (location.readset_) {
655  return cur_xct->add_related_write_set(location.readset_, tid, record, log_entry);
656  } else {
657  return cur_xct->add_to_write_set(get_id(), tid, record, log_entry);
658  }
659 }
660 
662  thread::Thread* context,
663  const RecordLocation& location,
664  const void* be_key,
665  KeyLength key_length,
666  const void* payload,
667  PayloadLength payload_count) {
668  if (!location.observed_.is_deleted()) {
670  }
672 
673  // as of reserve_record() it was spacious enough, and this length is
674  // either immutable or only increases, so this must hold.
675  MasstreeBorderPage* border = location.page_;
676  ASSERT_ND(border->get_max_payload_length(location.index_) >= payload_count);
677 
678  uint16_t log_length = MasstreeInsertLogType::calculate_log_length(key_length, payload_count);
679  MasstreeInsertLogType* log_entry = reinterpret_cast<MasstreeInsertLogType*>(
680  context->get_thread_log_buffer().reserve_new_log(log_length));
681  log_entry->populate(
682  get_id(),
683  be_key,
684  key_length,
685  payload,
686  payload_count);
687  border->header().stat_last_updater_node_ = context->get_numa_node();
688  return register_record_write_log(context, location, log_entry);
689 }
690 
692  thread::Thread* context,
693  const RecordLocation& location,
694  const void* be_key,
695  KeyLength key_length) {
696  if (location.observed_.is_deleted()) {
697  // in this case, we don't need a page-version set. the physical record is surely there.
699  }
701 
702  MasstreeBorderPage* border = location.page_;
703  uint16_t log_length = MasstreeDeleteLogType::calculate_log_length(key_length);
704  MasstreeDeleteLogType* log_entry = reinterpret_cast<MasstreeDeleteLogType*>(
705  context->get_thread_log_buffer().reserve_new_log(log_length));
706  log_entry->populate(get_id(), be_key, key_length);
707  border->header().stat_last_updater_node_ = context->get_numa_node();
708  return register_record_write_log(context, location, log_entry);
709 }
710 
712  thread::Thread* context,
713  const RecordLocation& location,
714  const void* be_key,
715  KeyLength key_length,
716  const void* payload,
717  PayloadLength payload_count) {
718  // Upsert is a combination of what insert does and what delete does.
719  // If there isn't an existing physical record, it's exactly same as insert.
720  // If there is, it's _basically_ a delete followed by an insert.
721  // There are a few complications, depending on the status of the record.
723 
724  // as of reserve_record() it was spacious enough, and this length is
725  // either immutable or only increases, so this must hold.
726  MasstreeBorderPage* border = location.page_;
727  ASSERT_ND(border->get_max_payload_length(location.index_) >= payload_count);
728 
729  MasstreeCommonLogType* common_log;
730  if (location.observed_.is_deleted()) {
731  // If it's a deleted record, this turns to be a plain insert.
732  uint16_t log_length = MasstreeInsertLogType::calculate_log_length(key_length, payload_count);
733  MasstreeInsertLogType* log_entry = reinterpret_cast<MasstreeInsertLogType*>(
734  context->get_thread_log_buffer().reserve_new_log(log_length));
735  log_entry->populate(
736  get_id(),
737  be_key,
738  key_length,
739  payload,
740  payload_count);
741  common_log = log_entry;
742  } else if (payload_count == border->get_payload_length(location.index_)) {
743  // If it's not changing payload size of existing record, we can conver it to an overwrite,
744  // which is more efficient
745  uint16_t log_length = MasstreeOverwriteLogType::calculate_log_length(key_length, payload_count);
746  MasstreeOverwriteLogType* log_entry = reinterpret_cast<MasstreeOverwriteLogType*>(
747  context->get_thread_log_buffer().reserve_new_log(log_length));
748  log_entry->populate(
749  get_id(),
750  be_key,
751  key_length,
752  payload,
753  0,
754  payload_count);
755  common_log = log_entry;
756  } else {
757  // If not, this is an update operation.
758  uint16_t log_length = MasstreeUpdateLogType::calculate_log_length(key_length, payload_count);
759  MasstreeUpdateLogType* log_entry = reinterpret_cast<MasstreeUpdateLogType*>(
760  context->get_thread_log_buffer().reserve_new_log(log_length));
761  log_entry->populate(
762  get_id(),
763  be_key,
764  key_length,
765  payload,
766  payload_count);
767  common_log = log_entry;
768  }
769  border->header().stat_last_updater_node_ = context->get_numa_node();
770  return register_record_write_log(context, location, common_log);
771 }
773  thread::Thread* context,
774  const RecordLocation& location,
775  const void* be_key,
776  KeyLength key_length,
777  const void* payload,
778  PayloadLength payload_offset,
779  PayloadLength payload_count) {
780  if (location.observed_.is_deleted()) {
781  // in this case, we don't need a page-version set. the physical record is surely there.
783  }
785  MasstreeBorderPage* border = location.page_;
786  if (border->get_payload_length(location.index_) < payload_offset + payload_count) {
787  LOG(WARNING) << "short record "; // probably this is a rare error. so warn.
789  }
790 
791  uint16_t log_length = MasstreeOverwriteLogType::calculate_log_length(key_length, payload_count);
792  MasstreeOverwriteLogType* log_entry = reinterpret_cast<MasstreeOverwriteLogType*>(
793  context->get_thread_log_buffer().reserve_new_log(log_length));
794  log_entry->populate(
795  get_id(),
796  be_key,
797  key_length,
798  payload,
799  payload_offset,
800  payload_count);
801  border->header().stat_last_updater_node_ = context->get_numa_node();
802  return register_record_write_log(context, location, log_entry);
803 }
804 
805 template <typename PAYLOAD>
807  thread::Thread* context,
808  const RecordLocation& location,
809  const void* be_key,
810  KeyLength key_length,
811  PAYLOAD* value,
812  PayloadLength payload_offset) {
813  if (location.observed_.is_deleted()) {
814  // in this case, we don't need a page-version set. the physical record is surely there.
816  }
818  MasstreeBorderPage* border = location.page_;
819  if (border->get_payload_length(location.index_) < payload_offset + sizeof(PAYLOAD)) {
820  LOG(WARNING) << "short record "; // probably this is a rare error. so warn.
822  }
823 
824  char* ptr = border->get_record_payload(location.index_) + payload_offset;
825  *value += *reinterpret_cast<const PAYLOAD*>(ptr);
826 
827  uint16_t log_length = MasstreeOverwriteLogType::calculate_log_length(key_length, sizeof(PAYLOAD));
828  MasstreeOverwriteLogType* log_entry = reinterpret_cast<MasstreeOverwriteLogType*>(
829  context->get_thread_log_buffer().reserve_new_log(log_length));
830  log_entry->populate(
831  get_id(),
832  be_key,
833  key_length,
834  value,
835  payload_offset,
836  sizeof(PAYLOAD));
837  border->header().stat_last_updater_node_ = context->get_numa_node();
838  return register_record_write_log(context, location, log_entry);
839 }
840 
841 // Defines MasstreeStorage methods so that we can inline implementation calls
843  xct::RwLockableXctId* old_address,
844  xct::WriteXctAccess* write_set) {
845  return MasstreeStoragePimpl(this).track_moved_record(old_address, write_set);
846 }
847 
849  xct::RwLockableXctId* old_address,
850  xct::WriteXctAccess* write_set) {
851  ASSERT_ND(old_address);
852  // We use moved bit only for volatile border pages
853  MasstreeBorderPage* page = reinterpret_cast<MasstreeBorderPage*>(to_page(old_address));
854  ASSERT_ND(page->is_border());
855  ASSERT_ND(page->is_moved() || old_address->is_next_layer());
856  return page->track_moved_record(engine_, old_address, write_set);
857 }
858 
859 // Explicit instantiations for each payload type
860 // @cond DOXYGEN_IGNORE
861 #define EXPIN_5(x) template ErrorCode MasstreeStoragePimpl::increment_general< x > \
862  (thread::Thread* context, const RecordLocation& location, \
863  const void* be_key, KeyLength key_length, x* value, PayloadLength payload_offset)
865 // @endcond
866 
867 } // namespace masstree
868 } // namespace storage
869 } // namespace foedus
void assert_aligned_page(const void *page)
Definition: page.hpp:402
0x080A : "STORAGE: The record's payload is smaller than requested" .
Definition: error_code.hpp:176
const KeySlice kInfimumSlice
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
SlotIndex get_key_count() const __attribute__((always_inline))
physical key count (those keys might be deleted) in this page.
ErrorCode find_border_physical(thread::Thread *context, MasstreePage *layer_root, uint8_t current_layer, bool for_writes, KeySlice slice, MasstreeBorderPage **border) __attribute__((always_inline))
Find a border node in the layer that corresponds to the given key slice.
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
uint16_t PayloadLength
Represents a byte-length of a payload in this package.
Definition: masstree_id.hpp:92
0x080C : "STORAGE: This key is not found in this storage" .
Definition: error_code.hpp:178
uint16_t SlotIndex
Index of a record in a (border) page.
ErrorCode reserve_record_normalized(thread::Thread *context, KeySlice key, PayloadLength payload_count, PayloadLength physical_payload_hint, RecordLocation *result)
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
Page * to_page(const void *address)
super-dirty way to obtain Page the address belongs to.
Definition: page.hpp:395
ErrorCode locate_record_normalized(thread::Thread *context, KeySlice key, bool for_writes, RecordLocation *result)
Identifies page and record for the normalized key.
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
void populate(StorageId storage_id, const void *key, KeyLength key_length, const void *payload, PayloadLength payload_offset, PayloadLength payload_count) __attribute__((always_inline))
PayloadLength get_max_payload_length(SlotIndex index) const __attribute__((always_inline))
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
Definition: page_pool.hpp:173
const KeyLength kMaxKeyLength
Max length of a key.
Definition: masstree_id.hpp:75
bool is_locked() const __attribute__((always_inline))
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents a record of write-access during a transaction.
Definition: xct_access.hpp:168
bool is_moved() const __attribute__((always_inline))
ErrorCode grab_one(PagePoolOffset *offset)
Grab only one page.
Definition: page_pool.cpp:132
bool is_border() const __attribute__((always_inline))
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
ErrorCode retrieve_part_general(thread::Thread *context, const RecordLocation &location, void *payload, PayloadLength payload_offset, PayloadLength payload_count)
void prefetch_general() const __attribute__((always_inline))
prefetch upto keys/separators, whether this page is border or interior.
ErrorCode follow_page(thread::Thread *context, bool for_writes, storage::DualPagePointer *pointer, MasstreePage **page)
Thread::follow_page_pointer() for masstree.
Declares all log types used in this storage type.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
Result of track_moved_record().
Definition: xct_id.hpp:1180
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Represents a user transaction.
Definition: xct.hpp:58
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
MiniPage & get_minipage(uint8_t index) __attribute__((always_inline))
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
A system transaction to reserve a physical record(s) in a border page.
ErrorStack load_one_volatile_page(cache::SnapshotFileSet *fileset, storage::SnapshotPagePointer snapshot_pointer, storage::VolatilePagePointer *pointer, storage::Page **page)
Another convenience method that also reads an existing snapshot page to the volatile page...
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Represents one border page in Masstree Storage.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
DualPagePointer * get_next_layer(SlotIndex index) __attribute__((always_inline))
XctId xct_id_
the second 64bit: Persistent status part of TID.
Definition: xct_id.hpp:1137
static uint16_t calculate_log_length(KeyLength key_length) __attribute__((always_inline))
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
ErrorCode overwrite_general(thread::Thread *context, const RecordLocation &location, const void *be_key, KeyLength key_length, const void *payload, PayloadLength payload_offset, PayloadLength payload_count)
implementation of overwrite_record family.
The storage has been created and ready for use.
Definition: storage_id.hpp:158
ErrorCode add_to_write_set(storage::StorageId storage_id, RwLockableXctId *owner_id_address, char *payload_address, log::RecordLogType *log_entry)
Add the given record to the write set of this transaction.
Definition: xct.cpp:444
uint64_t KeySlice
Each key slice is an 8-byte integer.
Holds a set of read-only file objects for snapshot files.
ErrorCode retrieve_general(thread::Thread *context, const RecordLocation &location, void *payload, PayloadLength *payload_capacity)
implementation of get_record family.
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
Definitions of IDs in this package and a few related constant values.
uint16_t KeyLength
Represents a byte-length of a key in this package.
Definition: masstree_id.hpp:69
Common base of MasstreeIntermediatePage and MasstreeBorderPage.
Log type of masstree-storage's insert operation.
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
char * get_record(SlotIndex index) __attribute__((always_inline))
VolatilePagePointer get_foster_minor() const __attribute__((always_inline))
PayloadLength get_payload_length(SlotIndex index) const __attribute__((always_inline))
VolatilePagePointer get_foster_major() const __attribute__((always_inline))
ErrorStack load(const StorageControlBlock &snapshot_block)
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
storage::Page * resolve(storage::VolatilePagePointer ptr) const
Shorthand for get_global_volatile_page_resolver.resolve_offset()
Definition: thread.cpp:129
DualPagePointer pointers_[kMaxIntermediateMiniSeparators+1]
FindKeyForReserveResult find_key_for_reserve(SlotIndex from_index, SlotIndex to_index, KeySlice slice, const void *suffix, KeyLength remainder) const __attribute__((always_inline))
This is for the case we are looking for either the matching slot or the slot we will modify...
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
Definition: thread.cpp:78
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
xct::TrackMovedRecordResult track_moved_record(xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
Resolves a "moved" record.
void release_pages_recursive_parallel(Engine *engine)
This method is used when we release a large number of volatile pages, most likely when we drop a stor...
0 means no-error.
Definition: error_code.hpp:87
Log type of masstree-storage's delete operation.
xct::ReadXctAccess * readset_
If this method took a read-set on the returned record, points to the corresponding read-set...
ErrorCode insert_general(thread::Thread *context, const RecordLocation &location, const void *be_key, KeyLength key_length, const void *payload, PayloadLength payload_count)
implementation of insert_record family.
SlotIndex find_key_normalized(SlotIndex from_index, SlotIndex to_index, KeySlice slice) const __attribute__((always_inline))
Specialized version for 8 byte native integer search.
MasstreeStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
ErrorCode reserve_record(thread::Thread *context, const void *key, KeyLength key_length, PayloadLength payload_count, PayloadLength physical_payload_hint, RecordLocation *result)
Like locate_record(), this is also a logical operation.
ErrorCode add_related_write_set(ReadXctAccess *related_read_set, RwLockableXctId *tid_address, char *payload_address, log::RecordLogType *log_entry)
Registers a write-set related to an existing read-set.
Definition: xct.cpp:506
ErrorCode register_record_write_log(thread::Thread *context, const RecordLocation &location, log::RecordLogType *log_entry)
Used in the following methods.
Calls Initializable::uninitialize() automatically when it gets out of scope.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Log type of masstree-storage's overwrite operation.
0x0802 : "STORAGE: This storage already exists" .
Definition: error_code.hpp:168
ErrorCode get_first_root(thread::Thread *context, bool for_write, MasstreeIntermediatePage **root)
Root-node related, such as a method to retrieve 1st-root, to grow, etc.
A system transaction to split a border page in Master-Tree.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
const SlotIndex kBorderPageMaxSlots
Maximum number of slots in one MasstreeBorderPage.
bool is_next_layer() const __attribute__((always_inline))
Definition: xct_id.hpp:1042
void set(uint8_t numa_node, memory::PagePoolOffset offset)
Definition: storage_id.hpp:212
uint8_t get_layer() const __attribute__((always_inline))
Layer-0 stores the first 8 byte slice, Layer-1 next 8 byte...
bool is_deleted() const __attribute__((always_inline))
Definition: xct_id.hpp:1040
KeySlice get_high_fence() const __attribute__((always_inline))
P * resolve_cast(storage::VolatilePagePointer ptr) const
resolve() plus reinterpret_cast
Definition: thread.hpp:110
bool within_foster_minor(KeySlice slice) const __attribute__((always_inline))
Log type of masstree-storage's update operation.
ErrorCode increment_general(thread::Thread *context, const RecordLocation &location, const void *be_key, KeyLength key_length, PAYLOAD *value, PayloadLength payload_offset)
implementation of increment_record family.
bool has_foster_child() const __attribute__((always_inline))
KeySlice get_low_fence() const __attribute__((always_inline))
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id, uint8_t layer, KeySlice low_fence, KeySlice high_fence)
SlotIndex find_key(KeySlice slice, const void *suffix, KeyLength remainder) const __attribute__((always_inline))
Navigates a searching key-slice to one of the record in this page.
static uint16_t calculate_log_length(KeyLength key_length, PayloadLength payload_count) __attribute__((always_inline))
ErrorCode delete_general(thread::Thread *context, const RecordLocation &location, const void *be_key, KeyLength key_length)
implementation of delete_record family.
bool is_high_fence_supremum() const __attribute__((always_inline))
xct::TrackMovedRecordResult track_moved_record(Engine *engine, xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
ErrorCode follow_layer(thread::Thread *context, bool for_writes, MasstreeBorderPage *parent, SlotIndex record_index, MasstreePage **page) __attribute__((always_inline))
Follows to next layer's root page.
const PageVersion * get_version_address() const __attribute__((always_inline))
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id, uint8_t layer, uint8_t level, KeySlice low_fence, KeySlice high_fence)
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
PageVersionStatus status_
Definition: page.hpp:172
0x0809 : "STORAGE: The record's payload is larger than the buffer" .
Definition: error_code.hpp:175
const ErrorStack kRetOk
Normal return value for no-error case.
SlotIndex index_
Index of the record in the page.
Resolves an offset in local (same NUMA node) page pool to a pointer and vice versa.
char * get_record_payload(SlotIndex index) __attribute__((always_inline))
ThreadGroupId get_numa_node() const
Definition: thread.hpp:66
ErrorCode add_to_page_version_set(const storage::PageVersion *version_address, storage::PageVersionStatus observed)
Add the given page version to the page version set of this transaction.
Definition: xct.cpp:242
ErrorStack drop()
Storage-wide operations, such as drop, create, etc.
A system transaction to grow a first-layer root.
A system transaction to grow a second- or depper-layer root.
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
void populate(StorageId storage_id, const void *key, KeyLength key_length) __attribute__((always_inline))
uint8_t stat_last_updater_node_
A loosely maintained statistics for volatile pages.
Definition: page.hpp:251
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
ErrorStack create(const MasstreeMetadata &metadata)
const KeySlice kSupremumSlice
#define INSTANTIATE_ALL_NUMERIC_TYPES(M)
INSTANTIATE_ALL_TYPES minus std::string.
xct::RwLockableXctId * get_owner_id(SlotIndex index) __attribute__((always_inline))
Represents one intermediate page in Masstree Storage.
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
Base class for log type of record-wise operation.
void populate(StorageId storage_id, const void *key, KeyLength key_length, const void *payload, PayloadLength payload_count) __attribute__((always_inline))
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
MasstreeBorderPage * page_
The border page containing the record.
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
bool is_moved() const __attribute__((always_inline))
Definition: xct_id.hpp:1041
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void populate(StorageId storage_id, const void *key, KeyLength key_length, const void *payload, PayloadLength payload_count) __attribute__((always_inline))
KeySlice slice_layer(const void *be_bytes, KeyLength key_length, Layer current_layer)
Extract a part of a big-endian byte array of given length as KeySlice.
ErrorCode upsert_general(thread::Thread *context, const RecordLocation &location, const void *be_key, KeyLength key_length, const void *payload, PayloadLength payload_count)
implementation of upsert_record family.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries=0)
Methods related to System transactions (sysxct) nested under this thread.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
A base layout of shared data for all storage types.
Definition: storage.hpp:53
xct::XctId observed_
TID as of locate_record() identifying the record.
A system transaction to adopt foster twins into an intermediate page.
bool is_next_layer() const __attribute__((always_inline))
Definition: xct_id.hpp:1143
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
return value of MasstreeStoragePimpl::locate_record()/reserve_record().
bool within_fences(KeySlice slice) const __attribute__((always_inline))
bool does_point_to_layer(SlotIndex index) const __attribute__((always_inline))
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
uint8_t find_minipage(KeySlice slice) const __attribute__((always_inline))
Navigates a searching key-slice to one of the mini pages in this page.
static ErrorCode check_next_layer_bit(xct::XctId observed) __attribute__((always_inline))
uint8_t find_pointer(KeySlice slice) const __attribute__((always_inline))
Navigates a searching key-slice to one of pointers in this mini-page.
0x080B : "STORAGE: This key already exists in this storage" .
Definition: error_code.hpp:177
xct::TrackMovedRecordResult track_moved_record(xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set) __attribute__((always_inline))
ErrorCode populate_logical(xct::Xct *cur_xct, MasstreeBorderPage *page, SlotIndex index, bool intended_for_write)
Populates the result with XID and possibly readset.
ErrorCode locate_record(thread::Thread *context, const void *key, KeyLength key_length, bool for_writes, RecordLocation *result)
Identifies page and record for the key.
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112
const PageVersion & get_version() const __attribute__((always_inline))