libfoedus-core
FOEDUS Core Library
hash_storage_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 
26 #include "foedus/engine.hpp"
32 #include "foedus/log/log_type.hpp"
51 #include "foedus/thread/thread.hpp"
52 #include "foedus/xct/xct.hpp"
54 
55 namespace foedus {
56 namespace storage {
57 namespace hash {
58 
60  LOG(INFO) << "Uninitializing an hash-storage " << get_name();
61 
62  if (!control_block_->root_page_pointer_.volatile_pointer_.is_null()) {
63  // release volatile pages
64  const memory::GlobalVolatilePageResolver& page_resolver
66  HashIntermediatePage* root = reinterpret_cast<HashIntermediatePage*>(
67  page_resolver.resolve_offset(control_block_->root_page_pointer_.volatile_pointer_));
69  control_block_->root_page_pointer_.volatile_pointer_.word = 0;
70  }
71 
72  return kRetOk;
73 }
74 
75 
77  if (exists()) {
78  LOG(ERROR) << "This hash-storage already exists: " << get_name();
80  }
81 
82  // hash-specific check.
83  // Due to the current design of hash_partitioner, we spend hashbins bytes
84  // out of the partitioner memory.
85  uint64_t required_partitioner_bytes = metadata.get_bin_count() + 4096ULL;
86  uint64_t partitioner_bytes
88  // we don't bother checking other storages' consumption. the config might later change anyways.
89  // Instead, leave a bit of margin (25%) for others.
90  if (partitioner_bytes < required_partitioner_bytes * 1.25) {
91  std::stringstream str;
92  str << metadata << ".\n"
93  << "To accomodate this number of hash bins, partitioner_data_memory_mb_ must be"
94  << " at least " << (required_partitioner_bytes * 1.25 / (1ULL << 20));
95  return ERROR_STACK_MSG(kErrorCodeStrHashBinsTooMany, str.str().c_str());
96  }
97 
98  control_block_->meta_ = metadata;
99  LOG(INFO) << "Newly creating an hash-storage " << get_name();
100  control_block_->bin_count_ = 1ULL << get_bin_bits();
101  control_block_->levels_ = bins_to_level(control_block_->bin_count_);
102  ASSERT_ND(control_block_->levels_ >= 1U);
103  ASSERT_ND(control_block_->bin_count_ <= fanout_power(control_block_->levels_));
104  ASSERT_ND(control_block_->bin_count_ > fanout_power(control_block_->levels_ - 1U));
105  LOG(INFO) << "bin_count=" << get_bin_count() << ", levels=" << static_cast<int>(get_levels());
106 
107  // small number of root pages. we should at least have that many free pages.
108  // so far grab all of them from node 0. no round robbin
109  const uint16_t kTheNode = 0;
110  memory::PagePool* pool
112  const memory::LocalPageResolver &local_resolver = pool->get_resolver();
113 
114  // allocate only the root page
115  memory::PagePoolOffset root_offset;
116  WRAP_ERROR_CODE(pool->grab_one(&root_offset));
117  ASSERT_ND(root_offset);
118  HashIntermediatePage* root_page = reinterpret_cast<HashIntermediatePage*>(
119  local_resolver.resolve_offset_newpage(root_offset));
120  control_block_->root_page_pointer_.snapshot_pointer_ = 0;
121  control_block_->root_page_pointer_.volatile_pointer_.set(kTheNode, root_offset);
122  root_page->initialize_volatile_page(
123  get_id(),
124  control_block_->root_page_pointer_.volatile_pointer_,
125  nullptr,
126  control_block_->levels_ - 1U,
127  0);
128  root_page->assert_range();
129 
130  LOG(INFO) << "Newly created an hash-storage " << get_name();
131  control_block_->status_ = kExists;
132  return kRetOk;
133 }
134 
136  control_block_->meta_ = static_cast<const HashMetadata&>(snapshot_block.meta_);
137  const HashMetadata& meta = control_block_->meta_;
138  control_block_->bin_count_ = 1ULL << get_bin_bits();
139  control_block_->levels_ = bins_to_level(control_block_->bin_count_);
140  control_block_->root_page_pointer_.snapshot_pointer_ = meta.root_snapshot_page_id_;
141  control_block_->root_page_pointer_.volatile_pointer_.word = 0;
142 
143  // Root page always has volatile version.
144  // Construct it from snapshot version.
146  CHECK_ERROR(fileset.initialize());
148 
149  // load root page
150  VolatilePagePointer volatile_pointer;
151  HashIntermediatePage* volatile_root;
153  &fileset,
155  &volatile_pointer,
156  reinterpret_cast<Page**>(&volatile_root)));
157  control_block_->root_page_pointer_.volatile_pointer_ = volatile_pointer;
158 
159  CHECK_ERROR(fileset.uninitialize());
160 
161  LOG(INFO) << "Loaded a hash-storage " << get_name();
162  control_block_->status_ = kExists;
163  return kRetOk;
164 }
165 
167  thread::Thread* context,
168  const void* key,
169  uint16_t key_length,
170  const HashCombo& combo,
171  void* payload,
172  uint16_t* payload_capacity,
173  bool read_only) {
174  HashDataPage* bin_head;
175  CHECK_ERROR_CODE(locate_bin(context, !read_only, combo, &bin_head));
176  if (!bin_head) {
177  return kErrorCodeStrKeyNotFound; // protected by pointer set, so we are done
178  }
179  RecordLocation location;
181  context,
182  !read_only,
183  false,
184  0,
185  key,
186  key_length,
187  combo,
188  bin_head,
189  &location));
190  if (!location.is_found()) {
191  return kErrorCodeStrKeyNotFound; // protected by page version set, so we are done
192  }
193 
194  // here, we do NOT have to do another optimistic-read protocol because we already took
195  // the owner_id into read-set. If this read is corrupted, we will be aware of it at commit time.
196  uint16_t payload_length = location.cur_payload_length_;
197  if (payload_length > *payload_capacity) {
198  // buffer too small
199  DVLOG(0) << "buffer too small??" << payload_length << ":" << *payload_capacity;
200  *payload_capacity = payload_length;
202  }
203 
204  *payload_capacity = payload_length;
205  uint16_t key_offset = location.get_aligned_key_length();
206  std::memcpy(payload, location.record_ + key_offset, payload_length);
207  return kErrorCodeOk;
208 }
209 
211  thread::Thread* context,
212  const void* key,
213  uint16_t key_length,
214  const HashCombo& combo,
215  void* payload,
216  uint16_t payload_offset,
217  uint16_t payload_count,
218  bool read_only) {
219  HashDataPage* bin_head;
220  CHECK_ERROR_CODE(locate_bin(context, !read_only, combo, &bin_head));
221  if (!bin_head) {
222  return kErrorCodeStrKeyNotFound; // protected by pointer set, so we are done
223  }
224  RecordLocation location;
226  context,
227  !read_only,
228  false,
229  0,
230  key,
231  key_length,
232  combo,
233  bin_head,
234  &location));
235  if (!location.is_found()) {
236  return kErrorCodeStrKeyNotFound; // protected by page version set, so we are done
237  }
238 
239  uint16_t payload_length = location.cur_payload_length_;
240  if (payload_length < payload_offset + payload_count) {
241  LOG(WARNING) << "short record " << combo; // probably this is a rare error. so warn.
243  }
244 
245  uint16_t key_offset = location.get_aligned_key_length();
246  std::memcpy(payload, location.record_ + key_offset + payload_offset, payload_count);
247  return kErrorCodeOk;
248 }
249 
250 uint16_t adjust_payload_hint(uint16_t payload_count, uint16_t physical_payload_hint) {
251  ASSERT_ND(physical_payload_hint >= payload_count); // if not, most likely misuse.
252  if (physical_payload_hint < payload_count) {
253  physical_payload_hint = payload_count;
254  }
255  physical_payload_hint = assorted::align8(physical_payload_hint);
256  return physical_payload_hint;
257 }
258 
259 
261  thread::Thread* context,
262  const RecordLocation& location,
263  log::RecordLogType* log_entry) {
264  // If we have taken readset in locate_record, add as a related write set
265  ASSERT_ND(location.is_found());
266  auto* slot = location.page_->get_slot_address(location.index_);
267  char* record = location.record_;
268  xct::Xct* cur_xct = &context->get_current_xct();
269  if (location.readset_) {
270  return cur_xct->add_related_write_set(location.readset_, &slot->tid_, record, log_entry);
271  } else {
272  return cur_xct->add_to_write_set(get_id(), &slot->tid_, record, log_entry);
273  }
274 }
275 
276 
278  thread::Thread* context,
279  const void* key,
280  uint16_t key_length,
281  const HashCombo& combo,
282  const void* payload,
283  uint16_t payload_count,
284  uint16_t physical_payload_hint) {
285  physical_payload_hint = adjust_payload_hint(payload_count, physical_payload_hint);
286 
287  HashDataPage* bin_head;
288  CHECK_ERROR_CODE(locate_bin(context, true, combo, &bin_head));
289  ASSERT_ND(bin_head);
290 
291  while (true) { // we might retry due to migrate_record. not that often, tho.
292  RecordLocation location;
294  context,
295  true,
296  true,
297  physical_payload_hint,
298  key,
299  key_length,
300  combo,
301  bin_head,
302  &location));
303 
304  // we create if not exists, these are surely non-null
305  ASSERT_ND(location.is_found());
306  ASSERT_ND(location.record_);
307 
308  // but, that record might be not logically deleted
309  if (!location.observed_.is_deleted()) {
310  return kErrorCodeStrKeyAlreadyExists; // protected by the read set
311  }
312 
313  if (payload_count > location.get_max_payload()) {
314  // The physical record is too short. It will trigger a record expansion, which is a
315  // system transaction (logically does nothing!) to migrate this deleted record.
316  HashDataPage* cur_page = location.page_;
317  ASSERT_ND(!cur_page->header().snapshot_);
318  ASSERT_ND(cur_page->bloom_filter().contains(combo.fingerprint_));
319  DataPageSlotIndex cur_index = location.index_;
320  DVLOG(2) << "Record expansion triggered. payload_count=" << payload_count
321  << ", current max=" << location.get_max_payload()
322  << ", size hint=" << physical_payload_hint;
323 
324  ReserveRecords functor(
325  context,
326  cur_page,
327  key,
328  key_length,
329  combo,
330  payload_count,
331  physical_payload_hint,
332  cur_index);
333  CHECK_ERROR_CODE(context->run_nested_sysxct(&functor, 5U));
334  DVLOG(2) << "Expanded record!";
335  // need to re-locate. also, beacuse the above method is physical-only,
336  // the moved location might be again moved or now deleted. easise to just retry.
337  continue;
338  }
339 
340  uint16_t log_length = HashInsertLogType::calculate_log_length(key_length, payload_count);
341  HashInsertLogType* log_entry = reinterpret_cast<HashInsertLogType*>(
342  context->get_thread_log_buffer().reserve_new_log(log_length));
343  log_entry->populate(
344  get_id(),
345  key,
346  key_length,
347  get_bin_bits(),
348  combo.hash_,
349  payload,
350  payload_count);
351 
352  return register_record_write_log(context, location, log_entry);
353  }
354 }
355 
357  thread::Thread* context,
358  const void* key,
359  uint16_t key_length,
360  const HashCombo& combo) {
361  HashDataPage* bin_head;
362  CHECK_ERROR_CODE(locate_bin(context, true, combo, &bin_head));
363  ASSERT_ND(bin_head);
364  RecordLocation location;
366  context,
367  true,
368  false,
369  0,
370  key,
371  key_length,
372  combo,
373  bin_head,
374  &location));
375 
376  if (!location.is_found()) {
377  return kErrorCodeStrKeyNotFound; // protected by page version set, so we are done
378  } else if (location.observed_.is_deleted()) {
379  return kErrorCodeStrKeyNotFound; // protected by the read set
380  }
381 
382  uint16_t log_length = HashDeleteLogType::calculate_log_length(key_length, 0);
383  HashDeleteLogType* log_entry = reinterpret_cast<HashDeleteLogType*>(
384  context->get_thread_log_buffer().reserve_new_log(log_length));
385  log_entry->populate(get_id(), key, key_length, get_bin_bits(), combo.hash_);
386 
387  return register_record_write_log(context, location, log_entry);
388 }
389 
391  thread::Thread* context,
392  const void* key,
393  uint16_t key_length,
394  const HashCombo& combo,
395  const void* payload,
396  uint16_t payload_count,
397  uint16_t physical_payload_hint) {
398  physical_payload_hint = adjust_payload_hint(payload_count, physical_payload_hint);
399 
400  // Upsert is a combination of what insert does and what delete does.
401  // If there isn't an existing physical record, it's exactly same as insert.
402  // If there is, it's _basically_ a delete followed by an insert.
403  // There are a few complications, depending on the status of the record.
404 
405  HashDataPage* bin_head;
406  CHECK_ERROR_CODE(locate_bin(context, true, combo, &bin_head));
407  ASSERT_ND(bin_head);
408 
409  while (true) { // we might retry due to migrate_record. not that often, tho.
410  RecordLocation location;
412  context,
413  true,
414  true,
415  physical_payload_hint,
416  key,
417  key_length,
418  combo,
419  bin_head,
420  &location));
421 
422  // we create if not exists, these are surely non-null
423  ASSERT_ND(location.is_found());
424  ASSERT_ND(location.record_);
425 
426  // Whether currently deleted or not, migrate it to make sure the record is long enough.
427  if (payload_count > location.get_max_payload()) {
428  HashDataPage* cur_page = location.page_;
429  ASSERT_ND(!cur_page->header().snapshot_);
430  ASSERT_ND(cur_page->bloom_filter().contains(combo.fingerprint_));
431  DataPageSlotIndex cur_index = location.index_;
432  DVLOG(2) << "Record expansion triggered. payload_count=" << payload_count
433  << ", current max=" << location.get_max_payload()
434  << ", size hint=" << physical_payload_hint;
435 
436  ReserveRecords functor(
437  context,
438  cur_page,
439  key,
440  key_length,
441  combo,
442  payload_count,
443  physical_payload_hint,
444  cur_index);
445  CHECK_ERROR_CODE(context->run_nested_sysxct(&functor, 5U));
446  DVLOG(2) << "Expanded record!";
447  continue; // need to re-locate the record. retry.
448  }
449 
450  ASSERT_ND(payload_count <= location.get_max_payload());
451  HashCommonLogType* log_common;
452  if (location.observed_.is_deleted()) {
453  // If it's a deleted record, this turns to be a plain insert.
454  uint16_t log_length = HashInsertLogType::calculate_log_length(key_length, payload_count);
455  HashInsertLogType* log_entry = reinterpret_cast<HashInsertLogType*>(
456  context->get_thread_log_buffer().reserve_new_log(log_length));
457  log_entry->populate(
458  get_id(),
459  key,
460  key_length,
461  get_bin_bits(),
462  combo.hash_,
463  payload,
464  payload_count);
465  log_common = log_entry;
466  } else if (location.cur_payload_length_ == payload_count) {
467  // If it's not changing payload size of existing record, we can conver it to an overwrite,
468  // which is more efficient
469  uint16_t log_length = HashUpdateLogType::calculate_log_length(key_length, payload_count);
470  HashOverwriteLogType* log_entry = reinterpret_cast<HashOverwriteLogType*>(
471  context->get_thread_log_buffer().reserve_new_log(log_length));
472  log_entry->populate(
473  get_id(),
474  key,
475  key_length,
476  get_bin_bits(),
477  combo.hash_,
478  payload,
479  0,
480  payload_count);
481  log_common = log_entry;
482  } else {
483  // If not, this is an update operation.
484  uint16_t log_length = HashUpdateLogType::calculate_log_length(key_length, payload_count);
485  HashUpdateLogType* log_entry = reinterpret_cast<HashUpdateLogType*>(
486  context->get_thread_log_buffer().reserve_new_log(log_length));
487  log_entry->populate(
488  get_id(),
489  key,
490  key_length,
491  get_bin_bits(),
492  combo.hash_,
493  payload,
494  payload_count);
495  log_common = log_entry;
496  }
497 
498  return register_record_write_log(context, location, log_common);
499  }
500 }
501 
503  thread::Thread* context ,
504  const void* key,
505  uint16_t key_length,
506  const HashCombo& combo,
507  const void* payload,
508  uint16_t payload_offset,
509  uint16_t payload_count) {
510  HashDataPage* bin_head;
511  CHECK_ERROR_CODE(locate_bin(context, true, combo, &bin_head));
512  ASSERT_ND(bin_head);
513  RecordLocation location;
515  context,
516  true,
517  false,
518  0,
519  key,
520  key_length,
521  combo,
522  bin_head,
523  &location));
524 
525  if (!location.is_found()) {
526  return kErrorCodeStrKeyNotFound; // protected by page version set, so we are done
527  } else if (location.observed_.is_deleted()) {
528  return kErrorCodeStrKeyNotFound; // protected by the read set
529  } else if (location.cur_payload_length_ < payload_offset + payload_count) {
530  LOG(WARNING) << "short record " << combo; // probably this is a rare error. so warn.
531  return kErrorCodeStrTooShortPayload; // protected by the read set
532  }
533 
534  uint16_t log_length = HashOverwriteLogType::calculate_log_length(key_length, payload_count);
535  HashOverwriteLogType* log_entry = reinterpret_cast<HashOverwriteLogType*>(
536  context->get_thread_log_buffer().reserve_new_log(log_length));
537  log_entry->populate(
538  get_id(),
539  key,
540  key_length,
541  get_bin_bits(),
542  combo.hash_,
543  payload,
544  payload_offset,
545  payload_count);
546 
547  // overwrite_record is apparently a blind-write, but actually it's not.
548  // we depend on the fact that the record was not deleted/moved! so,
549  // this still has a related/dependent read-set
550  return register_record_write_log(context, location, log_entry);
551 }
552 
553 template <typename PAYLOAD>
555  thread::Thread* context,
556  const void* key,
557  uint16_t key_length,
558  const HashCombo& combo,
559  PAYLOAD* value,
560  uint16_t payload_offset) {
561  HashDataPage* bin_head;
562  CHECK_ERROR_CODE(locate_bin(context, true, combo, &bin_head));
563  ASSERT_ND(bin_head);
564  RecordLocation location;
566  context,
567  true,
568  false,
569  0,
570  key,
571  key_length,
572  combo,
573  bin_head,
574  &location));
575 
576  if (!location.is_found()) {
577  return kErrorCodeStrKeyNotFound; // protected by page version set, so we are done
578  } else if (location.observed_.is_deleted()) {
579  return kErrorCodeStrKeyNotFound; // protected by the read set
580  } else if (location.cur_payload_length_ < payload_offset + sizeof(PAYLOAD)) {
581  LOG(WARNING) << "short record " << combo; // probably this is a rare error. so warn.
582  return kErrorCodeStrTooShortPayload; // protected by the read set
583  }
584 
585  // value: (in) addendum, (out) value after addition.
586  PAYLOAD* current = reinterpret_cast<PAYLOAD*>(
587  location.record_ + location.get_aligned_key_length() + payload_offset);
588  *value += *current;
589 
590  uint16_t log_length
591  = HashOverwriteLogType::calculate_log_length(key_length, sizeof(PAYLOAD));
592  HashOverwriteLogType* log_entry = reinterpret_cast<HashOverwriteLogType*>(
593  context->get_thread_log_buffer().reserve_new_log(log_length));
594  log_entry->populate(
595  get_id(),
596  key,
597  key_length,
598  get_bin_bits(),
599  combo.hash_,
600  value,
601  payload_offset,
602  sizeof(PAYLOAD));
603 
604  return register_record_write_log(context, location, log_entry);
605 }
606 
608  thread::Thread* context,
609  bool for_write,
610  HashIntermediatePage** root) {
612  nullptr, // guaranteed to be non-null
613  false, // guaranteed to be non-null
614  for_write,
615  false, // guaranteed to be non-null
616  &control_block_->root_page_pointer_,
617  reinterpret_cast<Page**>(root),
618  nullptr, // no parent. it's root.
619  0));
620  ASSERT_ND((*root)->header().get_page_type() == kHashIntermediatePageType);
621  ASSERT_ND((*root)->get_level() + 1U == control_block_->levels_);
622  return kErrorCodeOk;
623 }
624 
626  thread::Thread* context,
627  bool for_write,
628  HashIntermediatePage* parent,
629  uint16_t index_in_parent,
630  Page** page) {
631  ASSERT_ND(index_in_parent < kHashIntermediatePageFanout);
632  ASSERT_ND(parent);
634  bool is_parent_snapshot = parent->header().snapshot_;
635  uint8_t parent_level = parent->get_level();
636  ASSERT_ND(!is_parent_snapshot || !for_write);
637 
638  DualPagePointer& pointer = parent->get_pointer(index_in_parent);
639  bool child_intermediate = (parent_level > 0);
640  if (is_parent_snapshot) {
641  // if we are in snapshot world, there is no choice.
642  // separating this out also handles SI level well.
643  ASSERT_ND(!for_write);
644  if (pointer.snapshot_pointer_ == 0) {
645  *page = nullptr;
646  } else {
648  ASSERT_ND((*page)->get_header().snapshot_);
649  }
650  } else if (child_intermediate) {
653  !for_write, // null page is a valid result only for reads ("not found")
654  for_write,
655  true, // if we jump to snapshot page, we need to add it to pointer set for serializability.
656  &pointer,
657  page,
658  reinterpret_cast<Page*>(parent),
659  index_in_parent));
660  } else {
661  // we are in a level-0 volatile page. so the pointee is a bin-head.
662  // we need a bit special handling in this case
663  CHECK_ERROR_CODE(follow_page_bin_head(context, for_write, parent, index_in_parent, page));
664  }
665 
666  if (*page) {
667  if (child_intermediate) {
668  ASSERT_ND((*page)->get_page_type() == kHashIntermediatePageType);
669  ASSERT_ND((*page)->get_header().get_in_layer_level() + 1U == parent_level);
670  } else {
671  ASSERT_ND((*page)->get_page_type() == kHashDataPageType);
672  ASSERT_ND(reinterpret_cast<HashDataPage*>(*page)->get_bin()
673  == parent->get_bin_range().begin_ + index_in_parent);
674  }
675  }
676  return kErrorCodeOk;
677 }
678 
680  thread::Thread* context,
681  bool for_write,
682  HashIntermediatePage* parent,
683  uint16_t index_in_parent,
684  Page** page) {
685  // do we have to newly create a volatile version of the pointed bin?
686  ASSERT_ND(!parent->header().snapshot_);
688  ASSERT_ND(parent->get_level() == 0);
689  xct::Xct& cur_xct = context->get_current_xct();
690  xct::IsolationLevel isolation = cur_xct.get_isolation_level();
691  DualPagePointer* pointer = parent->get_pointer_address(index_in_parent);
692 
693  // otherwise why in volatile page.
694  ASSERT_ND(for_write || isolation != xct::kSnapshot || pointer->snapshot_pointer_ == 0);
695  // in other words, we can safely "prefer" volatile page here.
696  if (!pointer->volatile_pointer_.is_null()) {
697  *page = context->resolve(pointer->volatile_pointer_);
698  } else {
699  SnapshotPagePointer snapshot_pointer = pointer->snapshot_pointer_;
700  if (!for_write) {
701  // reads don't have to create a new page. easy
702  if (snapshot_pointer == 0) {
703  *page = nullptr;
704  } else {
705  CHECK_ERROR_CODE(context->find_or_read_a_snapshot_page(snapshot_pointer, page));
706  }
707 
708  if (isolation == xct::kSerializable) {
709  VolatilePagePointer null_pointer;
710  null_pointer.clear();
711  cur_xct.add_to_pointer_set(&(pointer->volatile_pointer_), null_pointer);
712  }
713  } else {
714  // writes need the volatile version.
715  if (snapshot_pointer == 0) {
716  // The bin is completely empty. we just make a new empty page.
719  false,
720  true,
721  true,
722  pointer,
723  page,
724  reinterpret_cast<Page*>(parent),
725  index_in_parent));
726  } else {
727  // Otherwise, we must create a volatile version of the existing page.
728  // a special rule for hash storage in this case: we create/drop volatile versions
729  // in the granularity of hash bin. all or nothing.
730  // thus, not just the head page of the bin, we have to volatilize the entire bin.
731  memory::NumaCoreMemory* core_memory = context->get_thread_memory();
732  const memory::LocalPageResolver& local_resolver
734  VolatilePagePointer head_page_id = core_memory->grab_free_volatile_page_pointer();
735  const auto offset = head_page_id.get_offset();
736  if (UNLIKELY(head_page_id.is_null())) {
738  }
739 
740  HashDataPage* head_page
741  = reinterpret_cast<HashDataPage*>(local_resolver.resolve_offset_newpage(offset));
742  storage::Page* snapshot_head;
743  ErrorCode code = context->find_or_read_a_snapshot_page(snapshot_pointer, &snapshot_head);
744  if (code != kErrorCodeOk) {
745  core_memory->release_free_volatile_page(offset);
746  return code;
747  }
748 
749  std::memcpy(head_page, snapshot_head, kPageSize);
750  ASSERT_ND(head_page->header().snapshot_);
751  head_page->header().snapshot_ = false;
752  head_page->header().page_id_ = head_page_id.word;
753 
754  // load following pages. hopefully this is a rare case.
755  ErrorCode last_error = kErrorCodeOk;
756  if (UNLIKELY(head_page->next_page().snapshot_pointer_)) {
757  HashDataPage* cur_page = head_page;
758  while (true) {
759  ASSERT_ND(last_error == kErrorCodeOk);
761  if (next == 0) {
762  break;
763  }
764 
765  DVLOG(1) << "Following next-link in hash data pages. Hopefully it's not that long..";
766  VolatilePagePointer next_page_id = core_memory->grab_free_volatile_page_pointer();
767  memory::PagePoolOffset next_offset = next_page_id.get_offset();
768  if (UNLIKELY(next_page_id.is_null())) {
769  // we have to release preceding pages too
770  last_error = kErrorCodeMemoryNoFreePages;
771  break;
772  }
773  HashDataPage* next_page
774  = reinterpret_cast<HashDataPage*>(local_resolver.resolve_offset_newpage(next_offset));
775  // immediately install because:
776  // 1) we don't have any race here, 2) we need to follow them to release on error.
777  DualPagePointer* target = cur_page->next_page_address();
779  target->volatile_pointer_ = next_page_id;
780  target->snapshot_pointer_ = 0; // will be no longer used, let's clear them
781 
782  storage::Page* snapshot_page;
783  last_error = context->find_or_read_a_snapshot_page(next, &snapshot_page);
784  if (last_error != kErrorCodeOk) {
785  break;
786  }
787  std::memcpy(next_page, snapshot_page, kPageSize);
788  ASSERT_ND(next_page->header().snapshot_);
789  ASSERT_ND(next_page->get_bin() == cur_page->get_bin());
790  next_page->header().snapshot_ = false;
791  next_page->header().page_id_ = next_page_id.word;
792  cur_page = next_page;
793  }
794  }
795 
796  // all rihgt, now atomically install the pointer to the volatile head page.
797  bool must_release_pages = false;
798  if (last_error == kErrorCodeOk) {
799  uint64_t expected = 0;
800  if (assorted::raw_atomic_compare_exchange_strong<uint64_t>(
801  &(pointer->volatile_pointer_.word),
802  &expected,
803  head_page_id.word)) {
804  // successfully installed the head pointer. fine.
805  *page = reinterpret_cast<Page*>(head_page);
806  } else {
807  ASSERT_ND(expected);
808  // someone else has installed it, which is also fine.
809  // but, we must release pages we created (which turned out to be a waste effort)
810  LOG(INFO) << "Interesting. Someone else has installed a volatile version.";
811  *page = context->resolve(pointer->volatile_pointer_);
812  must_release_pages = true;
813  }
814  } else {
815  must_release_pages = true;
816  }
817 
818  if (must_release_pages) {
819  HashDataPage* cur = head_page;
820  while (true) {
822  ASSERT_ND(cur_id.get_numa_node() == context->get_numa_node());
823  ASSERT_ND(!cur_id.is_null());
824  // retrieve next_id BEFORE releasing (revoking) cur page.
826  core_memory->release_free_volatile_page(cur_id.get_offset());
827  if (next_id.is_null()) {
828  break;
829  }
830  cur = context->resolve_cast<HashDataPage>(next_id);
831  }
832  }
833 
834  CHECK_ERROR_CODE(last_error);
835  }
836  }
837  }
838 
839  return kErrorCodeOk;
840 }
841 
843  thread::Thread* context,
844  bool for_write,
845  const HashCombo& combo,
846  HashDataPage** bin_head) {
847  HashIntermediatePage* root;
848  CHECK_ERROR_CODE(get_root_page(context, for_write, &root));
849  ASSERT_ND(root);
850  *bin_head = nullptr;
851  xct::Xct& current_xct = context->get_current_xct();
852 
853  HashIntermediatePage* parent = root;
854  while (true) {
855  ASSERT_ND(parent);
856  uint8_t parent_level = parent->get_level();
857  uint16_t index = combo.route_.route[parent_level];
858  Page* next;
859  CHECK_ERROR_CODE(follow_page(context, for_write, parent, index, &next));
860  if (!next) {
861  // if this is a read-access, it is possible that the page even doesn't exist.
862  // it is a valid result (not found). we just have to add a pointer set to protect the result.
863  ASSERT_ND(!for_write);
864  if (!parent->header().snapshot_ // then we already added a pointer set in higher level
865  && current_xct.get_isolation_level() == xct::kSerializable) {
866  VolatilePagePointer volatile_null;
867  volatile_null.clear();
869  current_xct.add_to_pointer_set(
870  &parent->get_pointer(index).volatile_pointer_,
871  volatile_null));
872  }
873  break;
874  } else {
875  if (parent_level == 0) {
876  *bin_head = reinterpret_cast<HashDataPage*>(next);
877  break;
878  } else {
879  parent = reinterpret_cast<HashIntermediatePage*>(next);
880  }
881  }
882  }
883 
884  ASSERT_ND(*bin_head != nullptr || !for_write);
885  ASSERT_ND(*bin_head == nullptr || (*bin_head)->get_bin() == combo.bin_);
886  return kErrorCodeOk;
887 }
888 
890  thread::Thread* context,
891  const void* key,
892  uint16_t key_length,
893  const HashCombo& combo,
894  HashDataPage* bin_head,
895  RecordLocation* result) {
896  ASSERT_ND(bin_head);
897  ASSERT_ND(bin_head->get_bin() == combo.bin_);
898  ASSERT_ND(bin_head->header().snapshot_);
899  result->clear();
900  // Snapshot version doesn't need any of the concerns above. Easy!
901  // Just physically search and follow to next page.
902  HashDataPage* page = bin_head;
903  while (true) {
904  const uint16_t record_count = page->get_record_count();
905  const DataPageSlotIndex index = page->search_key_physical(
906  combo.hash_,
907  combo.fingerprint_,
908  key,
909  key_length,
910  record_count);
911  if (index != kSlotNotFound) {
912  // found! this is final in snapshot page, so physical-only suffices (no protection needed).
913  ASSERT_ND(page->compare_slot_key(index, combo.hash_, key, key_length));
914  result->populate_physical(page, index);
915  ASSERT_ND(!result->observed_.is_moved());
916  return kErrorCodeOk;
917  }
918 
919  // Definitely not in this page, now on to next page.
920  DualPagePointer* next_page = page->next_page_address();
921  // then we are in snapshot world. no race.
922  ASSERT_ND(next_page->volatile_pointer_.is_null());
923  SnapshotPagePointer pointer = next_page->snapshot_pointer_;
924  if (pointer) {
925  Page* next;
926  CHECK_ERROR_CODE(context->find_or_read_a_snapshot_page(pointer, &next));
927  ASSERT_ND(next->get_header().snapshot_);
928  page = reinterpret_cast<HashDataPage*>(next);
929  } else {
930  // it's snapshot world. the result is final, we are done.
931  return kErrorCodeOk;
932  }
933  }
934 }
935 
937  thread::Thread* context,
938  bool for_write,
939  bool physical_only,
940  bool create_if_notfound,
941  uint16_t create_payload_length,
942  const void* key,
943  uint16_t key_length,
944  const HashCombo& combo,
945  HashDataPage* bin_head,
946  RecordLocation* result) {
947  ASSERT_ND(bin_head);
948  ASSERT_ND(bin_head->get_bin() == combo.bin_);
949  ASSERT_ND(for_write || !create_if_notfound); // create_if_notfound implies for_write
950 
951  // Snapshot case is way easier. Separtely handle that case.
952  if (bin_head->header().snapshot_) {
953  ASSERT_ND(!for_write);
954  ASSERT_ND(!create_if_notfound);
955  return locate_record_in_snapshot(context, key, key_length, combo, bin_head, result);
956  }
957 
958  xct::Xct* cur_xct = &context->get_current_xct();
959 
960  // in hash storage, we maintain only bin_head's stat. it's enough
961  if (for_write) {
962  bin_head->header().stat_last_updater_node_ = context->get_numa_node();
963  }
964 
965  HashDataPage* page = bin_head;
966  while (true) {
967  // Start with a physical-only search. We will re-check our observation below.
968  const uint16_t record_count = page->get_record_count();
969  const DataPageSlotIndex index = page->search_key_physical(
970  combo.hash_,
971  combo.fingerprint_,
972  key,
973  key_length,
974  record_count);
975  if (index != kSlotNotFound) {
976  // found! but it might be now being logically moved/deleted.
977  ASSERT_ND(page->compare_slot_key(index, combo.hash_, key, key_length));
978  if (physical_only) {
979  // we don't care. the caller is responsible to do logical operation
980  result->populate_physical(page, index);
981  return kErrorCodeOk;
982  } else {
983  CHECK_ERROR_CODE(result->populate_logical(cur_xct, page, index, for_write));
984  if (UNLIKELY(result->observed_.is_moved())) {
985  LOG(INFO) << "Interesting. The record has been just moved";
986  continue;
987  }
988  return kErrorCodeOk;
989  }
990  }
991 
992  // Apparently not in this page, now on to next page. We have to be a bit careful
993  // in this case. Non-null next page means this page is already static, but we
994  // have to make sure we confirmed it in a right order.
995  DualPagePointer* next_page = page->next_page_address();
996 
997  // we are in volatile page, there might be a race!
998  PageVersionStatus page_status = page->header().page_version_.status_;
999  assorted::memory_fence_acquire(); // from now on, page_status is the ground truth here.
1000  // check a few things after the fence.
1001  // invariant: we never move on to next page without guaranteeing that this page does not
1002  // contain a physical non-moved record with the key.
1003 
1004  // did someone insert a new record at this moment?
1005  uint16_t record_count_again = page->get_record_count();
1006  if (UNLIKELY(record_count != record_count_again)) {
1007  LOG(INFO) << "Interesting. concurrent insertion just happend to the page";
1009  continue; // just retry to make it sure. this is rare.
1010  }
1011 
1012  // did someone install a new page at this moment?
1013  if (UNLIKELY(!page_status.has_next_page() && !next_page->volatile_pointer_.is_null())) {
1014  LOG(INFO) << "Interesting. concurrent next-page installation just happend to the page";
1016  continue; // just retry to make it sure. this is rare.
1017  }
1018 
1019  if (next_page->volatile_pointer_.is_null()) {
1020  // no next page.
1021  if (create_if_notfound) {
1022  // this is the tail page, so let's insert it here.
1023  // we do that as a system transaction.
1024  ASSERT_ND(for_write);
1025  DataPageSlotIndex new_location;
1027  context,
1028  key,
1029  key_length,
1030  combo,
1031  create_payload_length,
1032  &page,
1033  record_count,
1034  &new_location));
1035  ASSERT_ND(new_location != kSlotNotFound); // contract of the above method
1036  // The returned location is not logically protected... yet.
1037  ASSERT_ND(page->compare_slot_key(new_location, combo.hash_, key, key_length));
1038  if (physical_only) {
1039  // we don't care. the caller is responsible to do logical operation
1040  result->populate_physical(page, new_location);
1041  return kErrorCodeOk;
1042  } else {
1043  CHECK_ERROR_CODE(result->populate_logical(cur_xct, page, new_location, for_write));
1044  if (UNLIKELY(result->observed_.is_moved())) {
1045  LOG(INFO) << "Interesting. The record has been just moved after creation!";
1046  continue;
1047  }
1048  return kErrorCodeOk;
1049  }
1050  } else {
1051  // "NotFound" result. To finalize it,
1052  // we have to take version set because someone might
1053  // insert a new record/next-page later.
1054  result->clear();
1055  if (physical_only) {
1056  // we don't care. the caller is responsible to do logical operation
1057  return kErrorCodeOk;
1058  } else {
1059  // [Logical check]: Remember the page_status we observed as of checking record count
1060  // and verify it at commit time.
1062  &page->header().page_version_,
1063  page_status));
1064  return kErrorCodeOk;
1065  }
1066  }
1067  } else {
1068  page = context->resolve_cast<HashDataPage>(next_page->volatile_pointer_);
1069  ASSERT_ND(!page->header().snapshot_);
1070  }
1071  }
1072 }
1073 
1075  thread::Thread* context,
1076  const void* key,
1077  uint16_t key_length,
1078  const HashCombo& combo,
1079  uint16_t payload_length,
1080  HashDataPage** page_in_out,
1081  uint16_t examined_records,
1082  DataPageSlotIndex* new_location) {
1083  ASSERT_ND(new_location);
1084  ReserveRecords functor(
1085  context,
1086  *page_in_out,
1087  key,
1088  key_length,
1089  combo,
1090  payload_length,
1091  payload_length,
1092  examined_records);
1093  CHECK_ERROR_CODE(context->run_nested_sysxct(&functor, 5U));
1094  *page_in_out = functor.out_page_;
1095  *new_location = functor.out_slot_;
1096  return kErrorCodeOk;
1097 }
1098 
1100  xct::RwLockableXctId* old_address,
1101  xct::WriteXctAccess* write_set) {
1102  ASSERT_ND(old_address);
1103  ASSERT_ND(old_address->is_moved());
1104  // We use moved bit only for volatile data pages
1105  HashDataPage* page = reinterpret_cast<HashDataPage*>(to_page(old_address));
1106  ASSERT_ND(!page->header().snapshot_);
1107  ASSERT_ND(page->header().get_page_type() == kHashDataPageType);
1108 
1109  // TID is the first member in slot, so this ugly cast works.
1110  HashDataPage::Slot* old_slot = reinterpret_cast<HashDataPage::Slot*>(old_address);
1111  ASSERT_ND(&old_slot->tid_ == old_address);
1112 
1113  // for tracking, we need the full key and hash. let's extract them.
1114  const char* key = page->record_from_offset(old_slot->offset_);
1115  uint16_t key_length = old_slot->key_length_;
1116  HashCombo combo(key, key_length, control_block_->meta_);
1117 
1118  // we need write_set only for sanity check. It's easier in hash storage!
1119  if (write_set) {
1120 #ifndef NDEBUG
1121  ASSERT_ND(write_set->storage_id_ == page->header().storage_id_);
1122  ASSERT_ND(write_set->payload_address_ == page->record_from_offset(old_slot->offset_));
1123  HashCommonLogType* the_log = reinterpret_cast<HashCommonLogType*>(write_set->log_entry_);
1124  the_log->assert_record_and_log_keys(old_address, page->record_from_offset(old_slot->offset_));
1125 #endif // NDEBUG
1126  }
1127  HashDataPage::Slot* slot_origin = reinterpret_cast<HashDataPage::Slot*>(page + 1);
1128  ASSERT_ND(slot_origin > old_slot); // because origin corresponds to "-1".
1129  DataPageSlotIndex old_index = slot_origin - old_slot - 1;
1130  ASSERT_ND(page->get_slot_address(old_index) == old_slot);
1131 
1132  return track_moved_record_search(page, key, key_length, combo);
1133 }
1134 
1136  HashDataPage* page,
1137  const void* key,
1138  uint16_t key_length,
1139  const HashCombo& combo) {
1140  const memory::GlobalVolatilePageResolver& resolver
1142  RecordLocation result;
1143  while (true) {
1144  ASSERT_ND(!page->header().snapshot_);
1146  const uint16_t record_count = page->get_record_count();
1147 
1148  // Tracking happens in commit phase, so we don't need further logical readset/lock things.
1149  // This is just to locate the new address. So, physical_only search.
1150  // (which might miss a new record being inserted, but it's just a bit conservative abort)
1151  DataPageSlotIndex index = page->search_key_physical(
1152  combo.hash_,
1153  combo.fingerprint_,
1154  key,
1155  key_length,
1156  record_count);
1157  if (index != kSlotNotFound) {
1158  HashDataPage::Slot* slot = page->get_slot_address(index);
1159  char* payload = page->record_from_offset(slot->offset_);
1160  return xct::TrackMovedRecordResult(&slot->tid_, payload);
1161  }
1162 
1163  // we must meet the same invariant as usual case. a bit simpler, though
1165  DualPagePointer* next_page = page->next_page_address();
1167 
1168  uint16_t record_count_again = page->get_record_count();
1169  if (UNLIKELY(record_count != record_count_again)) {
1170  LOG(INFO) << "Interesting. concurrent insertion just happend to the page";
1172  continue;
1173  }
1174  if (next_page->volatile_pointer_.is_null()) {
1175  // This shouldn't happen as far as we flip moved bit after installing the new record
1176  LOG(WARNING) << "no next page?? but we didn't find the moved record in this page";
1178  if (next_page->volatile_pointer_.is_null()) {
1179  LOG(ERROR) << "Unexpected error, failed to track moved record in hash storage."
1180  << " This should not happen. hash combo=" << combo;
1181  return xct::TrackMovedRecordResult();
1182  }
1183  continue;
1184  }
1185  page = reinterpret_cast<HashDataPage*>(resolver.resolve_offset(next_page->volatile_pointer_));
1186  }
1187 }
1188 
1189 // Explicit instantiations for each payload type
1190 // @cond DOXYGEN_IGNORE
1191 
1192 #define EXPIN_5I(x) template ErrorCode HashStoragePimpl::increment_record< x > \
1193  (thread::Thread* context, \
1194  const void* key, \
1195  uint16_t key_length, \
1196  const HashCombo& combo, \
1197  x* value, \
1198  uint16_t payload_offset)
1200 // @endcond
1201 
1202 } // namespace hash
1203 } // namespace storage
1204 } // namespace foedus
0x080D : "STORAGE: HASH: Number of hash-bins too large compared to storage.partitioner_data_memory_mb...
Definition: error_code.hpp:179
log::RecordLogType * log_entry_
Pointer to the log entry in private log buffer for this write opereation.
Definition: xct_access.hpp:175
0x080A : "STORAGE: The record's payload is smaller than requested" .
Definition: error_code.hpp:176
ErrorCode increment_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, PAYLOAD *value, uint16_t payload_offset)
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
uint16_t adjust_payload_hint(uint16_t payload_count, uint16_t physical_payload_hint)
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
Definition: thread.cpp:95
T align8(T value)
8-alignment.
void release_free_volatile_page(PagePoolOffset offset)
Returns one free volatile page to local page pool.
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
const DataPageBloomFilter & bloom_filter() const __attribute__((always_inline))
memory::NumaCoreMemory * get_thread_memory() const
Returns the private memory repository of this thread.
Definition: thread.cpp:57
0x080C : "STORAGE: This key is not found in this storage" .
Definition: error_code.hpp:178
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash, const void *payload, uint16_t payload_offset, uint16_t payload_count) __attribute__((always_inline))
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
Page * to_page(const void *address)
super-dirty way to obtain Page the address belongs to.
Definition: page.hpp:395
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
Definition: page_pool.hpp:173
const HashBinRange & get_bin_range() const
xct::RwLockableXctId tid_
TID of the record.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents a record of write-access during a transaction.
Definition: xct_access.hpp:168
ErrorCode grab_one(PagePoolOffset *offset)
Grab only one page.
Definition: page_pool.cpp:132
ErrorCode overwrite_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, const void *payload, uint16_t payload_offset, uint16_t payload_count)
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
DataPageSlotIndex out_slot_
[Out] The slot of the record that is found or created.
ErrorCode get_record_part(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, void *payload, uint16_t payload_offset, uint16_t payload_count, bool read_only)
ErrorCode locate_record_logical(thread::Thread *context, bool for_write, bool create_if_notfound, uint16_t create_payload_length, const void *key, uint16_t key_length, const HashCombo &combo, HashDataPage *bin_head, RecordLocation *result)
locate_record()'s logical+physical version.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
Result of track_moved_record().
Definition: xct_id.hpp:1180
const DataPageSlotIndex kSlotNotFound
Definition: hash_id.hpp:197
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Represents a user transaction.
Definition: xct.hpp:58
ErrorCode follow_page_bin_head(thread::Thread *context, bool for_write, HashIntermediatePage *parent, uint16_t index_in_parent, Page **page)
subroutine to follow a pointer to head of bin from a volatile parent
ErrorCode get_root_page(thread::Thread *context, bool for_write, HashIntermediatePage **root)
Retrieves the root page of this storage.
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
bool contains(const BloomFilterFingerprint &fingerprint) const __attribute__((always_inline))
ErrorCode follow_page(thread::Thread *context, bool for_write, HashIntermediatePage *parent, uint16_t index_in_parent, Page **page)
for non-root
ErrorStack load_one_volatile_page(cache::SnapshotFileSet *fileset, storage::SnapshotPagePointer snapshot_pointer, storage::VolatilePagePointer *pointer, storage::Page **page)
Another convenience method that also reads an existing snapshot page to the volatile page...
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
bool compare_slot_key(DataPageSlotIndex index, HashValue hash, const void *key, uint16_t key_length) const
returns whether the slot contains the exact key specified
Snapshot isolation (SI), meaning the transaction reads a consistent and complete image of the databas...
Definition: xct_id.hpp:78
ErrorCode populate_logical(xct::Xct *cur_xct, HashDataPage *page, DataPageSlotIndex index, bool intended_for_write)
Populates the result with XID and possibly readset.
ErrorCode insert_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, const void *payload, uint16_t payload_count, uint16_t physical_payload_hint)
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
The storage has been created and ready for use.
Definition: storage_id.hpp:158
ErrorCode add_to_write_set(storage::StorageId storage_id, RwLockableXctId *owner_id_address, char *payload_address, log::RecordLogType *log_entry)
Add the given record to the write set of this transaction.
Definition: xct.cpp:444
Definitions of IDs in this package and a few related constant values.
Holds a set of read-only file objects for snapshot files.
ErrorCode locate_record_reserve_physical(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, uint16_t payload_length, HashDataPage **page_in_out, uint16_t examined_records, DataPageSlotIndex *new_location)
Subroutine of locate_record() to create/migrate a physical record of the given key in the page or its...
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
uint8_t bins_to_level(uint64_t bins)
Definition: hash_id.hpp:90
HashDataPage * out_page_
[Out] The page that contains the found/created record.
storage::VolatilePagePointer grab_free_volatile_page_pointer()
Wrapper for grab_free_volatile_page().
HashDataPage * page_
The data page (might not be bin-head) containing the record.
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash, const void *payload, uint16_t payload_count) __attribute__((always_inline))
const EngineOptions & get_options() const
Definition: engine.cpp:39
Repository of memories dynamically acquired within one CPU core (thread).
const DualPagePointer * next_page_address() const __attribute__((always_inline))
ErrorStack create(const HashMetadata &metadata)
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
const Slot * get_slot_address(DataPageSlotIndex record) const __attribute__((always_inline))
same as &get_slot(), but this is more explicit and easier to understand/maintain
storage::Page * resolve(storage::VolatilePagePointer ptr) const
Shorthand for get_global_volatile_page_resolver.resolve_offset()
Definition: thread.cpp:129
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
Definition: thread.cpp:78
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
char * record_
Address of the record.
Independent utility methods/classes for hashination, or hash functions.
0 means no-error.
Definition: error_code.hpp:87
uint64_t fanout_power(uint8_t exponent)
Definition: hash_id.hpp:56
bool is_moved() const __attribute__((always_inline))
Definition: xct_id.hpp:1142
memory::PagePoolOffset get_offset() const
Definition: storage_id.hpp:202
HashStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
ErrorCode add_related_write_set(ReadXctAccess *related_read_set, RwLockableXctId *tid_address, char *payload_address, log::RecordLogType *log_entry)
Registers a write-set related to an existing read-set.
Definition: xct.cpp:506
storage::StorageOptions storage_
Declares all log types used in this storage type.
uint16_t cur_payload_length_
Logical payload length as-of the observed XID.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
Calls Initializable::uninitialize() automatically when it gets out of scope.
Constants and methods related to CPU cacheline and its prefetching.
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash, const void *payload, uint16_t payload_count) __attribute__((always_inline))
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Log type of hash-storage's update operation.
PageType get_page_type() const
Definition: page.hpp:280
0x0802 : "STORAGE: This storage already exists" .
Definition: error_code.hpp:168
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
char * payload_address_
Pointer to the payload of the record.
Definition: xct_access.hpp:172
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
ErrorStack load(const StorageControlBlock &snapshot_block)
A system transaction to reserve a physical record(s) in a hash data page.
Fix-sized slot for each record, which is placed at the end of data region.
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
uint8_t route[8]
[0] means ordinal in level-0 intermediate page, [1] in its parent page, [2]...
PageVersion page_version_
Used in several storage types as concurrency control mechanism for the page.
Definition: page.hpp:272
HashBin begin_
Inclusive beginning of the range.
Definition: hash_id.hpp:191
ErrorCode locate_bin(thread::Thread *context, bool for_write, const HashCombo &combo, HashDataPage **bin_head)
Find a pointer to the bin that contains records for the hash.
uint16_t DataPageSlotIndex
Definition: hash_id.hpp:196
bool is_deleted() const __attribute__((always_inline))
Definition: xct_id.hpp:1040
xct::TrackMovedRecordResult track_moved_record(xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
storage::StorageId storage_id_
The storage we accessed.
Definition: xct_access.hpp:85
P * resolve_cast(storage::VolatilePagePointer ptr) const
resolve() plus reinterpret_cast
Definition: thread.hpp:110
A set of information that are used in many places, extracted from the given key.
Definition: hash_combo.hpp:48
DualPagePointer * get_pointer_address(uint16_t index)
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id, const HashIntermediatePage *parent, uint8_t level, HashBin start_bin)
Called only when this page is initialized.
const memory::LocalPageResolver & get_local_volatile_page_resolver() const
Returns page resolver to convert only local page ID to page pointer.
Definition: thread.cpp:80
void assert_range() const __attribute__((always_inline))
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
void populate_physical(HashDataPage *page, DataPageSlotIndex index)
Populates fields other than readset_.
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash) __attribute__((always_inline))
ErrorCode locate_record(thread::Thread *context, bool for_write, bool physical_only, bool create_if_notfound, uint16_t create_payload_length, const void *key, uint16_t key_length, const HashCombo &combo, HashDataPage *bin_head, RecordLocation *result)
Usually follows locate_bin to locate the exact physical record for the key, or create a new one if no...
BloomFilterFingerprint fingerprint_
Definition: hash_combo.hpp:51
Represents an intermediate page in Hashtable Storage.
ErrorCode register_record_write_log(thread::Thread *context, const RecordLocation &location, log::RecordLogType *log_entry)
Used in the following methods.
void assert_record_and_log_keys(xct::RwLockableXctId *owner_id, const char *data) const
used only for sanity check.
IsolationLevel
Specifies the level of isolation during transaction processing.
Definition: xct_id.hpp:55
void hash_intermediate_volatile_page_init(const VolatilePageInitArguments &args)
volatile page initialize callback for HashIntermediatePage.
char * record_from_offset(uint16_t offset)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
Represents an individual data page in Hashtable Storage.
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
Definition: xct.hpp:149
const DualPagePointer & next_page() const __attribute__((always_inline))
Log type of hash-storage's insert operation.
void hash_data_volatile_page_init(const VolatilePageInitArguments &args)
volatile page initialize callback for HashDataPage.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Log type of hash-storage's overwrite operation.
xct::XctId observed_
TID as of locate_record() identifying the record.
PageVersionStatus status_
Definition: page.hpp:172
0x0809 : "STORAGE: The record's payload is larger than the buffer" .
Definition: error_code.hpp:175
VolatilePagePointer construct_volatile_page_pointer(uint64_t word)
Definition: storage_id.hpp:230
const ErrorStack kRetOk
Normal return value for no-error case.
Resolves an offset in local (same NUMA node) page pool to a pointer and vice versa.
return value of locate_record().
Metadata of an hash storage.
uint64_t get_bin_count() const
Number of bins in this hash storage.
ThreadGroupId get_numa_node() const
Definition: thread.hpp:66
ErrorCode add_to_page_version_set(const storage::PageVersion *version_address, storage::PageVersionStatus observed)
Add the given page version to the page version set of this transaction.
Definition: xct.cpp:242
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
xct::ReadXctAccess * readset_
If this method took a read-set on the returned record, points to the corresponding read-set...
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
uint8_t stat_last_updater_node_
A loosely maintained statistics for volatile pages.
Definition: page.hpp:251
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
Definition: hash_id.hpp:49
bool has_next_page() const __attribute__((always_inline))
Definition: page.hpp:73
#define INSTANTIATE_ALL_NUMERIC_TYPES(M)
INSTANTIATE_ALL_TYPES minus std::string.
static uint16_t calculate_log_length(uint16_t key_length, uint16_t payload_count) __attribute__((always_inline))
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
Base class for log type of record-wise operation.
uint16_t get_record_count() const __attribute__((always_inline))
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
bool is_moved() const __attribute__((always_inline))
Definition: xct_id.hpp:1041
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Definitions of IDs in this package and a few related constant values.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries=0)
Methods related to System transactions (sysxct) nested under this thread.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
Raw atomic operations that work for both C++11 and non-C++11 code.
A base layout of shared data for all storage types.
Definition: storage.hpp:53
DualPagePointer & get_pointer(uint16_t index)
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
Definition: xct.cpp:198
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode delete_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo)
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
DataPageSlotIndex search_key_physical(HashValue hash, const BloomFilterFingerprint &fingerprint, const void *key, KeyLength key_length, DataPageSlotIndex record_count, DataPageSlotIndex check_from=0) const
Search for a physical slot that exactly contains the given key.
DataPageSlotIndex index_
Index of the record in the page.
ErrorCode upsert_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, const void *payload, uint16_t payload_count, uint16_t physical_payload_hint)
0x080B : "STORAGE: This key already exists in this storage" .
Definition: error_code.hpp:177
ErrorCode get_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, void *payload, uint16_t *payload_capacity, bool read_only)
Protects against all anomalies in all situations.
Definition: xct_id.hpp:86
Log type of hash-storage's delete operation.
uint16_t offset_
Byte offset in data_ where this record starts.
ErrorCode locate_record_in_snapshot(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, HashDataPage *bin_head, RecordLocation *result)
Simpler version of locate_record for when we are in snapshot world.
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112
xct::TrackMovedRecordResult track_moved_record_search(HashDataPage *page, const void *key, uint16_t key_length, const HashCombo &combo)