libfoedus-core
FOEDUS Core Library
array_storage_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <string>
23 #include <vector>
24 
25 #include "foedus/engine.hpp"
30 #include "foedus/log/log_type.hpp"
46 #include "foedus/thread/thread.hpp"
47 #include "foedus/xct/xct.hpp"
50 
51 namespace foedus {
52 namespace storage {
53 namespace array {
54 // FIXME(tzwang): overwrite_record here doesn't add to read set, should we?
55 // Defines ArrayStorage methods so that we can inline implementation calls
56 uint16_t ArrayStorage::get_payload_size() const { return control_block_->meta_.payload_size_; }
57 ArrayOffset ArrayStorage::get_array_size() const { return control_block_->meta_.array_size_; }
58 uint8_t ArrayStorage::get_levels() const { return control_block_->levels_; }
60 
62  return ArrayStoragePimpl(this).verify_single_thread(context);
63 }
64 
66  thread::Thread* context, ArrayOffset offset, void *payload) {
67  return get_record(context, offset, payload, 0, get_payload_size());
68 }
69 
71  void *payload, uint16_t payload_offset, uint16_t payload_count) {
72  return ArrayStoragePimpl(this).get_record(
73  context, offset, payload, payload_offset, payload_count);
74 }
75 
76 template <typename T>
78  T *payload, uint16_t payload_offset) {
79  return ArrayStoragePimpl(this).get_record_primitive<T>(context, offset, payload, payload_offset);
80 }
81 
83  thread::Thread* context,
84  ArrayOffset offset,
85  const void **payload) {
86  return ArrayStoragePimpl(this).get_record_payload(context, offset, payload);
87 }
88 
90  thread::Thread* context,
91  ArrayOffset offset,
92  Record** record) {
93  return ArrayStoragePimpl(this).get_record_for_write(context, offset, record);
94 }
95 
97  const void *payload, uint16_t payload_offset, uint16_t payload_count) {
99  context,
100  offset,
101  payload,
102  payload_offset,
103  payload_count);
104 }
105 
106 template <typename T>
108  T payload, uint16_t payload_offset) {
110  context,
111  offset,
112  payload,
113  payload_offset);
114 }
115 
117  thread::Thread* context,
118  ArrayOffset offset,
119  Record* record,
120  const void *payload,
121  uint16_t payload_offset,
122  uint16_t payload_count) {
123  return ArrayStoragePimpl(this).overwrite_record(
124  context,
125  offset,
126  record,
127  payload,
128  payload_offset,
129  payload_count);
130 }
131 
132 template <typename T>
134  thread::Thread* context,
135  ArrayOffset offset,
136  Record* record,
137  T payload,
138  uint16_t payload_offset) {
140  context,
141  offset,
142  record,
143  payload,
144  payload_offset);
145 }
146 
147 template <typename T>
149  T* value, uint16_t payload_offset) {
150  return ArrayStoragePimpl(this).increment_record<T>(context, offset, value, payload_offset);
151 }
152 
153 template <typename T>
155  thread::Thread* context,
156  ArrayOffset offset,
157  T value,
158  uint16_t payload_offset) {
160  context,
161  offset,
162  value,
163  payload_offset);
164 }
165 
171  uint64_t array_size, uint16_t payload) {
172  payload = assorted::align8(payload);
173  uint64_t records_per_page = kDataSize / (payload + kRecordOverhead);
174 
175  // so, how many leaf pages do we need?
176  uint64_t leaf_pages = assorted::int_div_ceil(array_size, records_per_page);
177  LOG(INFO) << "We need " << leaf_pages << " leaf pages";
178 
179  // interior nodes
180  uint64_t total_pages = leaf_pages;
181  std::vector<uint64_t> pages;
182  pages.push_back(leaf_pages);
183  while (pages.back() != 1) {
184  uint64_t next_level_pages = assorted::int_div_ceil(pages.back(), kInteriorFanout);
185  LOG(INFO) << "Level-" << pages.size() << " would have " << next_level_pages << " pages";
186  pages.push_back(next_level_pages);
187  total_pages += next_level_pages;
188  }
189 
190  LOG(INFO) << "In total, we need " << total_pages << " pages";
191  return pages;
192 }
193 
194 uint8_t calculate_levels(const ArrayMetadata &metadata) {
195  uint64_t array_size = metadata.array_size_;
196  uint16_t payload = assorted::align8(metadata.payload_size_);
197  uint64_t records_per_page = kDataSize / (payload + kRecordOverhead);
198  uint8_t levels = 1;
199  for (uint64_t pages = assorted::int_div_ceil(array_size, records_per_page);
200  pages != 1;
201  pages = assorted::int_div_ceil(pages, kInteriorFanout)) {
202  ++levels;
203  }
204  return levels;
205 }
206 
208  const memory::GlobalVolatilePageResolver& resolver,
210  VolatilePagePointer volatile_page_id) {
211  ASSERT_ND(!volatile_page_id.is_null());
212  ArrayPage* page = reinterpret_cast<ArrayPage*>(resolver.resolve_offset(volatile_page_id));
213  if (!page->is_leaf()) {
214  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
215  DualPagePointer &child_pointer = page->get_interior_record(i);
216  VolatilePagePointer child_page_id = child_pointer.volatile_pointer_;
217  if (!child_page_id.is_null()) {
218  // then recurse
219  release_pages_recursive(resolver, batch, child_page_id);
220  child_pointer.volatile_pointer_.word = 0;
221  }
222  }
223  }
224  batch->release(volatile_page_id);
225 }
226 
228  LOG(INFO) << "Uninitializing an array-storage " << *this;
229  if (!control_block_->root_page_pointer_.volatile_pointer_.is_null()) {
230  memory::PageReleaseBatch release_batch(engine_);
233  &release_batch,
234  control_block_->root_page_pointer_.volatile_pointer_);
235  release_batch.release_all();
236  control_block_->root_page_pointer_.volatile_pointer_.word = 0;
237  }
238  return kRetOk;
239 }
240 
242  uint8_t levels,
243  uint16_t payload) {
244  const uint16_t payload_size_aligned = (assorted::align8(payload));
245 
246  std::vector<uint64_t> offset_intervals;
247  offset_intervals.push_back(kDataSize / (payload_size_aligned + kRecordOverhead));
248  for (uint8_t level = 1; level < levels; ++level) {
249  offset_intervals.push_back(offset_intervals[level - 1] * kInteriorFanout);
250  }
251  return offset_intervals;
252 }
253 
255  const uint16_t levels = calculate_levels(control_block_->meta_);
256  const uint32_t payload_size = control_block_->meta_.payload_size_;
257  const ArrayOffset array_size = control_block_->meta_.array_size_;
258  if (array_size > kMaxArrayOffset) {
260  }
261  control_block_->levels_ = levels;
262  control_block_->route_finder_ = LookupRouteFinder(levels, payload_size);
267  for (uint16_t level = 1; level < levels; ++level) {
269  }
270 
271  VolatilePagePointer volatile_pointer;
272  ArrayPage* volatile_root;
274  0,
275  &volatile_pointer,
276  reinterpret_cast<Page**>(&volatile_root)));
277  volatile_root->initialize_volatile_page(
278  engine_->get_savepoint_manager()->get_initial_current_epoch(), // lowest epoch in the system
279  get_id(),
280  volatile_pointer,
281  payload_size,
282  levels - 1U,
283  ArrayRange(0, array_size));
285  return kRetOk;
286 }
287 
289  if (exists()) {
290  LOG(ERROR) << "This array-storage already exists-" << metadata.id_;
292  }
293  control_block_->meta_ = static_cast<const ArrayMetadata&>(metadata);
295  LOG(INFO) << "Newly created an array-storage " << metadata.id_;
296 
298  return kRetOk;
299 }
300 
302  control_block_->meta_ = static_cast<const ArrayMetadata&>(snapshot_block.meta_);
303  const ArrayMetadata& meta = control_block_->meta_;
305  const uint16_t levels = calculate_levels(meta);
306  control_block_->levels_ = levels;
310 
311  // So far we assume the root page always has a volatile version.
312  // Create it now.
313  if (meta.root_snapshot_page_id_ != 0) {
315  CHECK_ERROR(fileset.initialize());
317  VolatilePagePointer volatile_pointer;
318  Page* volatile_root;
320  &fileset,
322  &volatile_pointer,
323  &volatile_root));
325  CHECK_ERROR(fileset.uninitialize());
326  } else {
327  LOG(INFO) << "Loading an empty array-storage-" << get_meta();
329  }
330 
332  LOG(INFO) << "Loaded an array-storage-" << get_meta();
333  return kRetOk;
334 }
335 
336 
338  thread::Thread* context,
339  ArrayOffset offset,
340  Record** out,
341  bool* snapshot_record) {
342  ASSERT_ND(exists());
343  ASSERT_ND(offset < get_array_size());
344  uint16_t index = 0;
345  ArrayPage* page = nullptr;
346  CHECK_ERROR_CODE(lookup_for_read(context, offset, &page, &index, snapshot_record));
347  ASSERT_ND(page);
348  ASSERT_ND(page->is_leaf());
349  ASSERT_ND(page->get_array_range().contains(offset));
351  *out = page->get_leaf_record(index, get_payload_size());
352  return kErrorCodeOk;
353 }
354 
356  thread::Thread* context,
357  ArrayOffset offset,
358  Record** out) {
359  ASSERT_ND(exists());
360  ASSERT_ND(offset < get_array_size());
361  uint16_t index = 0;
362  ArrayPage* page = nullptr;
363  CHECK_ERROR_CODE(lookup_for_write(context, offset, &page, &index));
364  ASSERT_ND(page);
365  ASSERT_ND(page->is_leaf());
366  ASSERT_ND(page->get_array_range().contains(offset));
368  *out = page->get_leaf_record(index, get_payload_size());
369  return kErrorCodeOk;
370 }
371 
373  thread::Thread* context,
374  ArrayOffset offset,
375  void* payload,
376  uint16_t payload_offset,
377  uint16_t payload_count) {
378  ASSERT_ND(payload_offset + payload_count <= get_payload_size());
379  Record *record = nullptr;
380  bool snapshot_record;
381  CHECK_ERROR_CODE(locate_record_for_read(context, offset, &record, &snapshot_record));
382  CHECK_ERROR_CODE(context->get_current_xct().on_record_read(false, &record->owner_id_));
383  std::memcpy(payload, record->payload_ + payload_offset, payload_count);
384  return kErrorCodeOk;
385 }
386 
387 template <typename T>
389  thread::Thread* context,
390  ArrayOffset offset,
391  T *payload,
392  uint16_t payload_offset) {
393  ASSERT_ND(payload_offset + sizeof(T) <= get_payload_size());
394  Record *record = nullptr;
395  bool snapshot_record;
396  CHECK_ERROR_CODE(locate_record_for_read(context, offset, &record, &snapshot_record));
397  CHECK_ERROR_CODE(context->get_current_xct().on_record_read(false, &record->owner_id_));
398  char* ptr = record->payload_ + payload_offset;
399  *payload = *reinterpret_cast<const T*>(ptr);
400  return kErrorCodeOk;
401 }
402 
404  thread::Thread* context,
405  ArrayOffset offset,
406  const void** payload) {
407  Record *record = nullptr;
408  bool snapshot_record;
409  CHECK_ERROR_CODE(locate_record_for_read(context, offset, &record, &snapshot_record));
410  xct::Xct& current_xct = context->get_current_xct();
411  if (!snapshot_record &&
412  current_xct.get_isolation_level() != xct::kDirtyRead) {
413  CHECK_ERROR_CODE(current_xct.on_record_read(false, &record->owner_id_));
414  }
415  *payload = record->payload_;
416  return kErrorCodeOk;
417 }
418 
420  thread::Thread* context,
421  ArrayOffset offset,
422  Record** record) {
423  CHECK_ERROR_CODE(locate_record_for_write(context, offset, record));
424  xct::Xct& current_xct = context->get_current_xct();
425  if (current_xct.get_isolation_level() != xct::kDirtyRead) {
426  CHECK_ERROR_CODE(current_xct.on_record_read(true, &(*record)->owner_id_));
427  }
428  return kErrorCodeOk;
429 }
430 
431 
433  const void *payload, uint16_t payload_offset, uint16_t payload_count) {
434  ASSERT_ND(payload_offset + payload_count <= get_payload_size());
435  Record *record = nullptr;
436  CHECK_ERROR_CODE(locate_record_for_write(context, offset, &record));
437  return overwrite_record(context, offset, record, payload, payload_offset, payload_count);
438 }
439 
440 template <typename T>
442  thread::Thread* context, ArrayOffset offset, T payload, uint16_t payload_offset) {
443  ASSERT_ND(payload_offset + sizeof(T) <= get_payload_size());
444  Record *record = nullptr;
445  CHECK_ERROR_CODE(locate_record_for_write(context, offset, &record));
446  return overwrite_record_primitive<T>(context, offset, record, payload, payload_offset);
447 }
448 
450  thread::Thread* context,
451  ArrayOffset offset,
452  Record* record,
453  const void *payload,
454  uint16_t payload_offset,
455  uint16_t payload_count) {
456  uint16_t log_length = ArrayOverwriteLogType::calculate_log_length(payload_count);
457  ArrayOverwriteLogType* log_entry = reinterpret_cast<ArrayOverwriteLogType*>(
458  context->get_thread_log_buffer().reserve_new_log(log_length));
459  log_entry->populate(get_id(), offset, payload, payload_offset, payload_count);
460  return context->get_current_xct().add_to_write_set(
461  get_id(),
462  &record->owner_id_,
463  record->payload_,
464  log_entry);
465 }
466 
467 template <typename T>
469  thread::Thread* context,
470  ArrayOffset offset,
471  Record* record,
472  T payload,
473  uint16_t payload_offset) {
474  uint16_t log_length = ArrayOverwriteLogType::calculate_log_length(sizeof(T));
475  ArrayOverwriteLogType* log_entry = reinterpret_cast<ArrayOverwriteLogType*>(
476  context->get_thread_log_buffer().reserve_new_log(log_length));
477  log_entry->populate_primitive<T>(get_id(), offset, payload, payload_offset);
478  return context->get_current_xct().add_to_write_set(
479  get_id(),
480  &record->owner_id_,
481  record->payload_,
482  log_entry);
483 }
484 
485 template <typename T>
487  thread::Thread* context, ArrayOffset offset, T* value, uint16_t payload_offset) {
488  ASSERT_ND(payload_offset + sizeof(T) <= get_payload_size());
489  Record *record = nullptr;
490  CHECK_ERROR_CODE(locate_record_for_write(context, offset, &record));
491 
492  // this is get_record + overwrite_record
493  // NOTE This version is like other storage's increment implementation.
494  // Taking read-set (and potentially locks), read the value, then remember the overwrite log.
495  // However the increment_record_oneshot() below is pretty different.
496  CHECK_ERROR_CODE(context->get_current_xct().on_record_read(true, &record->owner_id_));
497  char* ptr = record->payload_ + payload_offset;
498  T tmp = *reinterpret_cast<const T*>(ptr);
499  *value += tmp;
500  uint16_t log_length = ArrayOverwriteLogType::calculate_log_length(sizeof(T));
501  ArrayOverwriteLogType* log_entry = reinterpret_cast<ArrayOverwriteLogType*>(
502  context->get_thread_log_buffer().reserve_new_log(log_length));
503  log_entry->populate_primitive<T>(get_id(), offset, *value, payload_offset);
504  return context->get_current_xct().add_to_write_set(
505  get_id(),
506  &record->owner_id_,
507  record->payload_,
508  log_entry);
509 }
510 
511 template <typename T>
513  thread::Thread* context,
514  ArrayOffset offset,
515  T value,
516  uint16_t payload_offset) {
517  ASSERT_ND(payload_offset + sizeof(T) <= get_payload_size());
518  Record *record = nullptr;
519  CHECK_ERROR_CODE(locate_record_for_write(context, offset, &record));
520  // Only Array's increment can be the rare "write-set only" log.
521  // other increments have to check deletion bit at least.
522  // To make use of it, we do have array increment log with primitive type as parameter.
523  ValueType type = to_value_type<T>();
524  uint16_t log_length = ArrayIncrementLogType::calculate_log_length(type);
525  ArrayIncrementLogType* log_entry = reinterpret_cast<ArrayIncrementLogType*>(
526  context->get_thread_log_buffer().reserve_new_log(log_length));
527  log_entry->populate<T>(get_id(), offset, value, payload_offset);
528  return context->get_current_xct().add_to_write_set(
529  get_id(),
530  &record->owner_id_,
531  record->payload_,
532  log_entry);
533 }
534 
536  thread::Thread* context,
537  ArrayOffset offset,
538  ArrayPage** out,
539  uint16_t* index,
540  bool* snapshot_page) {
541  ASSERT_ND(exists());
542  ASSERT_ND(offset < get_array_size());
543  ASSERT_ND(out);
544  ASSERT_ND(index);
545  ArrayPage* current_page;
546  CHECK_ERROR_CODE(get_root_page(context, false, &current_page));
547  uint16_t levels = get_levels();
548  ASSERT_ND(current_page->get_array_range().contains(offset));
550  bool in_snapshot = current_page->header().snapshot_;
551  for (uint8_t level = levels - 1; level > 0; --level) {
552  ASSERT_ND(current_page->get_array_range().contains(offset));
553  DualPagePointer& pointer = current_page->get_interior_record(route.route[level]);
555  context,
556  in_snapshot,
557  false,
558  &pointer,
559  &current_page,
560  current_page,
561  route.route[level]));
562  in_snapshot = current_page->header().snapshot_;
563  }
564  ASSERT_ND(current_page->is_leaf());
565  ASSERT_ND(current_page->get_array_range().contains(offset));
566  ASSERT_ND(current_page->get_array_range().begin_ + route.route[0] == offset);
568  *out = current_page;
569  *index = route.route[0];
570  *snapshot_page = (*out)->header().snapshot_;
571  return kErrorCodeOk;
572 }
573 
575  thread::Thread* context,
576  ArrayOffset offset,
577  ArrayPage** out,
578  uint16_t* index) {
579  ASSERT_ND(exists());
580  ASSERT_ND(offset < get_array_size());
581  ASSERT_ND(out);
582  ASSERT_ND(index);
583  ArrayPage* current_page;
584  CHECK_ERROR_CODE(get_root_page(context, true, &current_page));
585  ASSERT_ND(!current_page->header().snapshot_);
586  uint16_t levels = get_levels();
587  ASSERT_ND(current_page->get_array_range().contains(offset));
589  for (uint8_t level = levels - 1; level > 0; --level) {
590  ASSERT_ND(current_page->get_array_range().contains(offset));
592  context,
593  false,
594  true,
595  &current_page->get_interior_record(route.route[level]),
596  &current_page,
597  current_page,
598  route.route[level]));
599  }
600  ASSERT_ND(current_page->is_leaf());
601  ASSERT_ND(current_page->get_array_range().contains(offset));
602  ASSERT_ND(current_page->get_array_range().begin_ + route.route[0] == offset);
604  *out = current_page;
605  *index = route.route[0];
606  return kErrorCodeOk;
607 }
608 
609 // so far experimental...
610 template <typename T>
612  thread::Thread* context,
613  uint16_t payload_offset,
614  uint16_t batch_size,
615  const ArrayOffset* offset_batch,
616  T* payload_batch) {
617  ArrayStoragePimpl pimpl(this);
618  for (uint16_t cur = 0; cur < batch_size;) {
619  uint16_t chunk = batch_size - cur;
620  if (chunk > ArrayStoragePimpl::kBatchMax) {
622  }
624  context,
625  payload_offset,
626  chunk,
627  &offset_batch[cur],
628  &payload_batch[cur]));
629  cur += chunk;
630  }
631  return kErrorCodeOk;
632 }
633 
635  thread::Thread* context,
636  uint16_t batch_size,
637  const ArrayOffset* offset_batch,
638  const void** payload_batch) {
639  ArrayStoragePimpl pimpl(this);
640  for (uint16_t cur = 0; cur < batch_size;) {
641  uint16_t chunk = batch_size - cur;
642  if (chunk > ArrayStoragePimpl::kBatchMax) {
644  }
646  context,
647  chunk,
648  &offset_batch[cur],
649  &payload_batch[cur]));
650  cur += chunk;
651  }
652  return kErrorCodeOk;
653 }
654 
656  thread::Thread* context,
657  uint16_t batch_size,
658  const ArrayOffset* offset_batch,
659  Record** record_batch) {
660  ArrayStoragePimpl pimpl(this);
661  for (uint16_t cur = 0; cur < batch_size;) {
662  uint16_t chunk = batch_size - cur;
663  if (chunk > ArrayStoragePimpl::kBatchMax) {
665  }
667  context,
668  chunk,
669  &offset_batch[cur],
670  &record_batch[cur]));
671  cur += chunk;
672  }
673  return kErrorCodeOk;
674 }
675 
676 
677 template <typename T>
679  thread::Thread* context,
680  uint16_t payload_offset,
681  uint16_t batch_size,
682  const ArrayOffset* offset_batch,
683  T* payload_batch) {
684  ASSERT_ND(batch_size <= kBatchMax);
685  Record* record_batch[kBatchMax];
686  bool snapshot_record_batch[kBatchMax];
688  context,
689  batch_size,
690  offset_batch,
691  record_batch,
692  snapshot_record_batch));
693  xct::Xct& current_xct = context->get_current_xct();
694  if (current_xct.get_isolation_level() != xct::kDirtyRead) {
695  for (uint8_t i = 0; i < batch_size; ++i) {
696  if (!snapshot_record_batch[i]) {
697  CHECK_ERROR_CODE(current_xct.on_record_read(false, &record_batch[i]->owner_id_));
698  }
699  }
701  }
702  // we anyway prefetched the first 64 bytes. if the column is not within there,
703  // let's do parallel prefetch
704  if (payload_offset + sizeof(T) + kRecordOverhead > 64) {
705  for (uint8_t i = 0; i < batch_size; ++i) {
706  assorted::prefetch_cacheline(record_batch[i]->payload_ + payload_offset);
707  }
708  }
709  for (uint8_t i = 0; i < batch_size; ++i) {
710  char* ptr = record_batch[i]->payload_ + payload_offset;
711  payload_batch[i] = *reinterpret_cast<const T*>(ptr);
712  }
713  return kErrorCodeOk;
714 }
715 
716 
718  thread::Thread* context,
719  uint16_t batch_size,
720  const ArrayOffset* offset_batch,
721  const void** payload_batch) {
722  ASSERT_ND(batch_size <= kBatchMax);
723  Record* record_batch[kBatchMax];
724  bool snapshot_record_batch[kBatchMax];
726  context,
727  batch_size,
728  offset_batch,
729  record_batch,
730  snapshot_record_batch));
731  xct::Xct& current_xct = context->get_current_xct();
732  if (current_xct.get_isolation_level() != xct::kDirtyRead) {
733  for (uint8_t i = 0; i < batch_size; ++i) {
734  if (!snapshot_record_batch[i]) {
735  CHECK_ERROR_CODE(current_xct.on_record_read(false, &record_batch[i]->owner_id_));
736  }
737  }
739  }
740  for (uint8_t i = 0; i < batch_size; ++i) {
741  payload_batch[i] = record_batch[i]->payload_;
742  }
743  return kErrorCodeOk;
744 }
745 
747  thread::Thread* context,
748  uint16_t batch_size,
749  const ArrayOffset* offset_batch,
750  Record** record_batch) {
751  ASSERT_ND(batch_size <= kBatchMax);
753  context,
754  batch_size,
755  offset_batch,
756  record_batch));
757  xct::Xct& current_xct = context->get_current_xct();
758  if (current_xct.get_isolation_level() != xct::kDirtyRead) {
759  for (uint8_t i = 0; i < batch_size; ++i) {
760  CHECK_ERROR_CODE(current_xct.on_record_read(true, &record_batch[i]->owner_id_));
761  }
763  }
764  return kErrorCodeOk;
765 }
766 
768  thread::Thread* context,
769  uint16_t batch_size,
770  const ArrayOffset* offset_batch,
771  Record** out_batch,
772  bool* snapshot_page_batch) {
773  ASSERT_ND(batch_size <= kBatchMax);
774  ArrayPage* page_batch[kBatchMax];
775  uint16_t index_batch[kBatchMax];
777  context,
778  batch_size,
779  offset_batch,
780  page_batch,
781  index_batch,
782  snapshot_page_batch));
783  const uint16_t payload_size = get_payload_size();
784  for (uint8_t i = 0; i < batch_size; ++i) {
785  ASSERT_ND(page_batch[i]);
786  ASSERT_ND(page_batch[i]->is_leaf());
787  ASSERT_ND(page_batch[i]->get_array_range().contains(offset_batch[i]));
788  out_batch[i] = page_batch[i]->get_leaf_record(index_batch[i], payload_size);
789  ASSERT_ND(out_batch[i]->owner_id_.xct_id_.is_valid());
790  }
791  return kErrorCodeOk;
792 }
793 
795  thread::Thread* context,
796  uint16_t batch_size,
797  const ArrayOffset* offset_batch,
798  ArrayPage** out_batch,
799  uint16_t* index_batch,
800  bool* snapshot_page_batch) {
801  ASSERT_ND(batch_size <= kBatchMax);
802  LookupRoute routes[kBatchMax];
803  ArrayPage* root_page;
804  CHECK_ERROR_CODE(get_root_page(context, false, &root_page));
805  uint16_t levels = get_levels();
806  bool root_snapshot = root_page->header().snapshot_;
807  const uint16_t payload_size = get_payload_size();
808  for (uint8_t i = 0; i < batch_size; ++i) {
809  ASSERT_ND(offset_batch[i] < get_array_size());
810  routes[i] = control_block_->route_finder_.find_route(offset_batch[i]);
811  if (levels <= 1U) {
813  routes[i].route[0],
814  payload_size));
815  } else {
816  assorted::prefetch_cacheline(&root_page->get_interior_record(routes[i].route[levels - 1]));
817  }
818  out_batch[i] = root_page;
819  snapshot_page_batch[i] = root_snapshot;
820  ASSERT_ND(out_batch[i]->get_array_range().contains(offset_batch[i]));
821  index_batch[i] = routes[i].route[levels - 1];
822  }
823 
824  for (uint8_t level = levels - 1; level > 0; --level) {
825  // note that we use out_batch as both input (parents) and output (the pages) here.
826  // the method works in that case too.
828  context,
829  batch_size,
830  snapshot_page_batch,
831  out_batch,
832  index_batch,
833  out_batch));
834 
835  for (uint8_t i = 0; i < batch_size; ++i) {
836  ASSERT_ND(snapshot_page_batch[i] == out_batch[i]->header().snapshot_);
837  ASSERT_ND(out_batch[i]->get_storage_id() == get_id());
838  ASSERT_ND(snapshot_page_batch[i]
839  || !construct_volatile_page_pointer(out_batch[i]->header().page_id_).is_null());
840  ASSERT_ND(!snapshot_page_batch[i]
841  || extract_local_page_id_from_snapshot_pointer(out_batch[i]->header().page_id_));
842  if (level == 1U) {
843  assorted::prefetch_cacheline(out_batch[i]->get_leaf_record(
844  routes[i].route[0],
845  payload_size));
846  } else {
848  &out_batch[i]->get_interior_record(routes[i].route[levels - 1]));
849  }
850  index_batch[i] = routes[i].route[level - 1];
851  }
852  }
853  for (uint8_t i = 0; i < batch_size; ++i) {
854  ASSERT_ND(out_batch[i]->is_leaf());
855  ASSERT_ND(out_batch[i]->get_array_range().contains(offset_batch[i]));
856  ASSERT_ND(out_batch[i]->get_array_range().begin_ + routes[i].route[0] == offset_batch[i]);
857  ASSERT_ND(index_batch[i] == routes[i].route[0]);
858  ASSERT_ND(
859  out_batch[i]->get_leaf_record(index_batch[i], payload_size)->owner_id_.xct_id_.is_valid());
860  }
861  return kErrorCodeOk;
862 }
863 
865  thread::Thread* context,
866  uint16_t batch_size,
867  const ArrayOffset* offset_batch,
868  Record** record_batch) {
869  ASSERT_ND(batch_size <= kBatchMax);
870  ArrayPage* pages[kBatchMax];
871  LookupRoute routes[kBatchMax];
872  uint16_t index_batch[kBatchMax];
873  ArrayPage* root_page;
874  CHECK_ERROR_CODE(get_root_page(context, true, &root_page));
875  ASSERT_ND(!root_page->header().snapshot_);
876  uint16_t levels = get_levels();
877  const uint16_t payload_size = get_payload_size();
878 
879  for (uint8_t i = 0; i < batch_size; ++i) {
880  ASSERT_ND(offset_batch[i] < get_array_size());
881  routes[i] = control_block_->route_finder_.find_route(offset_batch[i]);
882  if (levels <= 1U) {
884  routes[i].route[0],
885  payload_size));
886  } else {
887  assorted::prefetch_cacheline(&root_page->get_interior_record(routes[i].route[levels - 1]));
888  }
889  pages[i] = root_page;
890  ASSERT_ND(pages[i]->get_array_range().contains(offset_batch[i]));
891  index_batch[i] = routes[i].route[levels - 1];
892  }
893 
894  for (uint8_t level = levels - 1; level > 0; --level) {
895  // as noted above, in==out case.
897  context,
898  batch_size,
899  pages,
900  index_batch,
901  pages));
902 
903  for (uint8_t i = 0; i < batch_size; ++i) {
904  ASSERT_ND(!pages[i]->header().snapshot_);
905  ASSERT_ND(pages[i]->get_storage_id() == get_id());
906  ASSERT_ND(!construct_volatile_page_pointer(pages[i]->header().page_id_).is_null());
907  if (level == 1U) {
908  assorted::prefetch_cacheline(pages[i]->get_leaf_record(
909  routes[i].route[0],
910  payload_size));
911  } else {
913  &pages[i]->get_interior_record(routes[i].route[levels - 1]));
914  }
915  index_batch[i] = routes[i].route[level - 1];
916  }
917  }
918 
919  for (uint8_t i = 0; i < batch_size; ++i) {
920  ASSERT_ND(pages[i]->is_leaf());
921  ASSERT_ND(pages[i]->get_array_range().contains(offset_batch[i]));
922  ASSERT_ND(pages[i]->get_array_range().begin_ + routes[i].route[0] == offset_batch[i]);
923  ASSERT_ND(index_batch[i] == routes[i].route[0]);
924  record_batch[i] = pages[i]->get_leaf_record(routes[i].route[0], payload_size);
925  }
926  return kErrorCodeOk;
927 }
928 
929 #define CHECK_AND_ASSERT(x) do { ASSERT_ND(x); if (!(x)) \
930  return ERROR_STACK(kErrorCodeStrArrayFailedVerification); } while (0)
931 
933  ArrayPage* root;
934  WRAP_ERROR_CODE(get_root_page(context, false, &root));
935  return verify_single_thread(context, root);
936 }
939  if (page->is_leaf()) {
940  for (uint16_t i = 0; i < page->get_leaf_record_count(); ++i) {
941  Record* record = page->get_leaf_record(i, get_payload_size());
945  CHECK_AND_ASSERT(!record->owner_id_.is_moved());
948  }
949  } else {
950  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
951  DualPagePointer &child_pointer = page->get_interior_record(i);
952  VolatilePagePointer page_id = child_pointer.volatile_pointer_;
953  if (!page_id.is_null()) {
954  // then recurse
955  ArrayPage* child_page = reinterpret_cast<ArrayPage*>(resolver.resolve_offset(page_id));
956  CHECK_ERROR(verify_single_thread(context, child_page));
957  }
958  }
959  }
960  return kRetOk;
961 }
962 
963 
965  ArrayStoragePimpl pimpl(this);
966  return pimpl.hcc_reset_all_temperature_stat();
967 }
968 
970  LOG(INFO) << "**"
971  << std::endl << "***************************************************************"
972  << std::endl << "*** Reseting " << ArrayStorage(engine_, control_block_) << "'s "
973  << std::endl << "*** temperature stat for HCC"
974  << std::endl << "***************************************************************";
975 
977  if (pointer.volatile_pointer_.is_null()) {
978  LOG(INFO) << "No volatile pages.";
979  } else {
981  }
982 
983  LOG(INFO) << "Done resettting";
984  return kRetOk;
985 }
986 
988  VolatilePagePointer intermediate_page_id) {
989  const auto& resolver = engine_->get_memory_manager()->get_global_volatile_page_resolver();
990  ArrayPage* page = reinterpret_cast<ArrayPage*>(resolver.resolve_offset(intermediate_page_id));
991  ASSERT_ND(page);
992  ASSERT_ND(!page->is_leaf());
993  const bool bottom = page->get_level() == 1U;
994  for (uint8_t i = 0; i < kInteriorFanout; ++i) {
996  if (page_id.is_null()) {
997  continue;
998  }
999  if (bottom) {
1000  ArrayPage* leaf = reinterpret_cast<ArrayPage*>(resolver.resolve_offset(page_id));
1001  leaf->header().hotness_.reset();
1002  } else {
1004  }
1005  }
1006 
1007  return kRetOk;
1008 }
1009 
1011  thread::Thread* context,
1012  uint16_t batch_size,
1013  bool* in_snapshot,
1014  ArrayPage** parents,
1015  const uint16_t* index_in_parents,
1016  ArrayPage** out) {
1017  DualPagePointer* pointers[kBatchMax];
1018  for (uint8_t i = 0; i < batch_size; ++i) {
1019  ASSERT_ND(!parents[i]->is_leaf());
1020  ASSERT_ND(in_snapshot[i] == parents[i]->header().snapshot_);
1021  pointers[i] = &parents[i]->get_interior_record(index_in_parents[i]);
1022  }
1023 
1024 #ifndef NDEBUG
1025  // there is a case of out==parents. for the assertion below, let's copy
1026  ArrayPage* parents_copy[kBatchMax];
1027  for (uint8_t i = 0; i < batch_size; ++i) {
1028  parents_copy[i] = parents[i];
1029  }
1030 #endif // NDEBUG
1031 
1033  batch_size,
1035  false,
1036  true,
1037  pointers,
1038  reinterpret_cast<Page**>(parents),
1039  index_in_parents,
1040  in_snapshot,
1041  reinterpret_cast<Page**>(out)));
1042 
1043 #ifndef NDEBUG
1044  for (uint8_t i = 0; i < batch_size; ++i) {
1045  ArrayPage* page = out[i];
1046  ASSERT_ND(page != nullptr);
1047  /*
1048  if ((uintptr_t)(page) == 0xdadadadadadadadaULL) {
1049  for (uint8_t j = 0; j < batch_size; ++j) {
1050  in_snapshot[j] = parents_copy[j]->header().snapshot_;
1051  }
1052  CHECK_ERROR_CODE(context->follow_page_pointers_for_read_batch(
1053  batch_size,
1054  array_volatile_page_init,
1055  false,
1056  true,
1057  pointers,
1058  reinterpret_cast<Page**>(parents_copy),
1059  index_in_parents,
1060  in_snapshot,
1061  reinterpret_cast<Page**>(out)));
1062  }
1063  */
1064  ASSERT_ND(in_snapshot[i] == page->header().snapshot_);
1065  ASSERT_ND(page->get_level() + 1U == parents_copy[i]->get_level());
1066  if (page->is_leaf()) {
1068  ASSERT_ND(xct_id.is_valid());
1069  }
1070  }
1071 #endif // NDEBUG
1072  return kErrorCodeOk;
1073 }
1074 
1076  thread::Thread* context,
1077  uint16_t batch_size,
1078  ArrayPage** parents,
1079  const uint16_t* index_in_parents,
1080  ArrayPage** out) {
1081  DualPagePointer* pointers[kBatchMax];
1082  for (uint8_t i = 0; i < batch_size; ++i) {
1083  ASSERT_ND(!parents[i]->is_leaf());
1084  ASSERT_ND(!parents[i]->header().snapshot_);
1085  pointers[i] = &parents[i]->get_interior_record(index_in_parents[i]);
1086  }
1087 
1088 #ifndef NDEBUG
1089  // there is a case of out==parents. for the assertion below, let's copy
1090  ArrayPage* parents_copy[kBatchMax];
1091  for (uint8_t i = 0; i < batch_size; ++i) {
1092  parents_copy[i] = parents[i];
1093  }
1094 #endif // NDEBUG
1095 
1097  batch_size,
1099  pointers,
1100  reinterpret_cast<Page**>(parents),
1101  index_in_parents,
1102  reinterpret_cast<Page**>(out)));
1103 
1104 #ifndef NDEBUG
1105  for (uint8_t i = 0; i < batch_size; ++i) {
1106  ArrayPage* page = out[i];
1107  ASSERT_ND(page != nullptr);
1108  ASSERT_ND(!page->header().snapshot_);
1109  ASSERT_ND(page->get_level() + 1U == parents_copy[i]->get_level());
1110  if (page->is_leaf()) {
1112  ASSERT_ND(xct_id.is_valid());
1113  }
1114  }
1115 #endif // NDEBUG
1116  return kErrorCodeOk;
1117 }
1118 
1119 // Explicit instantiations for each type
1120 // @cond DOXYGEN_IGNORE
1121 #define EXPLICIT_INSTANTIATION_GET(x) template ErrorCode ArrayStorage::get_record_primitive< x > \
1122  (thread::Thread* context, ArrayOffset offset, x *payload, uint16_t payload_offset)
1123 INSTANTIATE_ALL_NUMERIC_TYPES(EXPLICIT_INSTANTIATION_GET);
1124 
1125 #define EX_GET_BATCH(x) template ErrorCode ArrayStorage::get_record_primitive_batch< x > \
1126  (thread::Thread* context, uint16_t payload_offset, \
1127  uint16_t batch_size, const ArrayOffset* offset_batch, x *payload)
1128 INSTANTIATE_ALL_NUMERIC_TYPES(EX_GET_BATCH);
1129 
1130 #define EX_GET_BATCH_IMPL(x) template ErrorCode \
1131  ArrayStoragePimpl::get_record_primitive_batch< x > \
1132  (thread::Thread* context, uint16_t payload_offset, \
1133  uint16_t batch_size, const ArrayOffset* offset_batch, x *payload)
1134 INSTANTIATE_ALL_NUMERIC_TYPES(EX_GET_BATCH_IMPL);
1135 
1136 #define EXPLICIT_INSTANTIATION_OV(x) template ErrorCode\
1137  ArrayStorage::overwrite_record_primitive< x > \
1138  (thread::Thread* context, ArrayOffset offset, x payload, uint16_t payload_offset)
1139 INSTANTIATE_ALL_NUMERIC_TYPES(EXPLICIT_INSTANTIATION_OV);
1140 
1141 #define EX_OV_REC(x) template ErrorCode\
1142  ArrayStorage::overwrite_record_primitive< x > \
1143  (thread::Thread* context, ArrayOffset offset, Record* record, x payload, uint16_t payload_offset)
1145 
1146 #define EX_OV_REC_IMPL(x) template ErrorCode\
1147  ArrayStoragePimpl::overwrite_record_primitive< x > \
1148  (thread::Thread* context, ArrayOffset offset, Record* record, x payload, uint16_t payload_offset)
1149 INSTANTIATE_ALL_NUMERIC_TYPES(EX_OV_REC_IMPL);
1150 
1151 #define EXPLICIT_INSTANTIATION_INC(x) template ErrorCode ArrayStorage::increment_record< x > \
1152  (thread::Thread* context, ArrayOffset offset, x* value, uint16_t payload_offset)
1153 INSTANTIATE_ALL_NUMERIC_TYPES(EXPLICIT_INSTANTIATION_INC);
1154 
1155 #define EXPLICIT_INSTANTIATION_GET_IMPL(x) template ErrorCode \
1156  ArrayStoragePimpl::get_record_primitive< x > \
1157  (thread::Thread* context, ArrayOffset offset, x *payload, uint16_t payload_offset)
1158 INSTANTIATE_ALL_NUMERIC_TYPES(EXPLICIT_INSTANTIATION_GET_IMPL);
1159 
1160 #define EXPLICIT_INSTANTIATION_GET_OV_IMPL(x) template ErrorCode \
1161  ArrayStoragePimpl::overwrite_record_primitive< x > \
1162  (thread::Thread* context, ArrayOffset offset, x payload, uint16_t payload_offset)
1163 INSTANTIATE_ALL_NUMERIC_TYPES(EXPLICIT_INSTANTIATION_GET_OV_IMPL);
1164 
1165 #define EXPLICIT_INSTANTIATION_GET_INC_IMPL(x) template ErrorCode \
1166  ArrayStoragePimpl::increment_record< x > \
1167  (thread::Thread* context, ArrayOffset offset, x* value, uint16_t payload_offset)
1168 INSTANTIATE_ALL_NUMERIC_TYPES(EXPLICIT_INSTANTIATION_GET_INC_IMPL);
1169 
1170 #define EX_INC1S(x) template ErrorCode ArrayStorage::increment_record_oneshot< x > \
1171  (thread::Thread* context, ArrayOffset offset, x value, uint16_t payload_offset)
1173 
1174 #define EX_INC1S_IMPL(x) template ErrorCode ArrayStoragePimpl::increment_record_oneshot< x > \
1175  (thread::Thread* context, ArrayOffset offset, x value, uint16_t payload_offset)
1176 INSTANTIATE_ALL_NUMERIC_TYPES(EX_INC1S_IMPL);
1177 // @endcond
1178 
1179 } // namespace array
1180 } // namespace storage
1181 } // namespace foedus
ErrorStack verify_single_thread(thread::Thread *context)
const memory::GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert page ID to page pointer.
Definition: thread.cpp:125
ErrorCode increment_record(thread::Thread *context, ArrayOffset offset, T *value, uint16_t payload_offset)
This one further optimizes overwrite_record_primitive() for the frequent use case of incrementing som...
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
ErrorCode locate_record_for_read_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, Record **out_batch, bool *snapshot_page_batch)
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
ErrorCode get_record_for_write_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, Record **record_batch)
T align8(T value)
8-alignment.
0x0826 : "STORAGE: Too large array size specified. The size of an array storage must be smaller than ...
Definition: error_code.hpp:189
ErrorCode locate_record_for_read(thread::Thread *context, ArrayOffset offset, Record **out, bool *snapshot_record) __attribute__((always_inline))
Represents one record in our key-value store.
Definition: record.hpp:33
StorageStatus status_
Status of the storage.
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
ErrorCode follow_page_pointers_for_write_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.
void release_all()
Called at the end to return all batched pages to their pools.
Definition: page_pool.cpp:189
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:91
ErrorCode follow_pointers_for_write_batch(thread::Thread *context, uint16_t batch_size, ArrayPage **parents, const uint16_t *index_in_parents, ArrayPage **out)
ArrayOffset array_size_
Size of this array.
const uint16_t kRecordOverhead
Byte size of system-managed region per each record.
Definition: record.hpp:56
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
Declares all log types used in this storage type.
uint8_t route[8]
[0] means record ordinal in leaf, [1] in its parent page, [2]...
Definition: array_route.hpp:48
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Compactly represents the route to reach the given offset.
Definition: array_route.hpp:41
ErrorCode get_record_for_write(thread::Thread *context, ArrayOffset offset, Record **record) __attribute__((always_inline))
ErrorCode get_record(thread::Thread *context, ArrayOffset offset, void *payload, uint16_t payload_offset, uint16_t payload_count) __attribute__((always_inline))
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ErrorCode lookup_for_read_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, ArrayPage **out_batch, uint16_t *index_batch, bool *snapshot_page_batch)
Represents a key-value store based on a dense and regular array.
ErrorCode locate_record_for_write(thread::Thread *context, ArrayOffset offset, Record **out) __attribute__((always_inline))
const ArrayMetadata * get_array_metadata() const
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
const ArrayOffset kMaxArrayOffset
The maximum value allowed for ArrayOffset.
Definition: array_id.hpp:54
ErrorCode on_record_read(bool intended_for_write, RwLockableXctId *tid_address, XctId *observed_xid, ReadXctAccess **read_set_address, bool no_readset_if_moved=false, bool no_readset_if_next_layer=false)
The general logic invoked for every record read.
Definition: xct.cpp:258
static uint16_t calculate_log_length(uint16_t payload_count) __attribute__((always_inline))
uint64_t intervals_[8]
intervals_[x] is the range of array offset in one page of level-x.
uint64_t ArrayOffset
The only key type in array storage.
Definition: array_id.hpp:48
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
void populate_primitive(StorageId storage_id, ArrayOffset offset, T payload, uint16_t payload_offset) __attribute__((always_inline))
For primitive types.
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Represents a user transaction.
Definition: xct.hpp:58
uint16_t get_payload_size() const
Returns byte size of one record in this array storage without internal overheads. ...
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
ErrorCode get_record_primitive(thread::Thread *context, ArrayOffset offset, T *payload, uint16_t payload_offset)
ErrorStack load_one_volatile_page(cache::SnapshotFileSet *fileset, storage::SnapshotPagePointer snapshot_pointer, storage::VolatilePagePointer *pointer, storage::Page **page)
Another convenience method that also reads an existing snapshot page to the volatile page...
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Represents one data page in Array Storage.
XctId xct_id_
the second 64bit: Persistent status part of TID.
Definition: xct_id.hpp:1137
void array_volatile_page_init(const VolatilePageInitArguments &args)
volatile page initialize callback for ArrayPage.
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
The storage has been created and ready for use.
Definition: storage_id.hpp:158
ErrorCode add_to_write_set(storage::StorageId storage_id, RwLockableXctId *owner_id_address, char *payload_address, log::RecordLogType *log_entry)
Add the given record to the write set of this transaction.
Definition: xct.cpp:444
Definitions of IDs in this package and a few related constant values.
Holds a set of read-only file objects for snapshot files.
ErrorCode get_record_payload(thread::Thread *context, ArrayOffset offset, const void **payload) __attribute__((always_inline))
ErrorCode get_root_page(thread::Thread *context, bool for_write, ArrayPage **out) __attribute__((always_inline))
const uint16_t kDataSize
Byte size of data region in each page of array storage.
Definition: array_id.hpp:100
bool is_valid() const __attribute__((always_inline))
Definition: xct_id.hpp:973
McsBlockIndex get_tail_waiter_block() const
Definition: xct_id.hpp:817
ErrorCode overwrite_record(thread::Thread *context, ArrayOffset offset, const void *payload)
Overwrites one record of the given offset in this array storage.
ErrorCode get_record_primitive_batch(thread::Thread *context, uint16_t payload_offset, uint16_t batch_size, const ArrayOffset *offset_batch, T *payload)
batched interface
ArrayOffset get_array_size() const
Returns the size of this array.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
McsRwLock lock_
the first 64bit: Locking part of TID
Definition: xct_id.hpp:1134
ArrayOffset begin_
Inclusive beginning of the offset range.
Definition: array_id.hpp:86
ErrorStack verify_single_thread(thread::Thread *context)
bool is_deleted() const __attribute__((always_inline))
Definition: xct_id.hpp:1141
Metadata of one storage.
Definition: metadata.hpp:58
ErrorCode lookup_for_write(thread::Thread *context, ArrayOffset offset, ArrayPage **out, uint16_t *index) __attribute__((always_inline))
This version always returns a volatile page, installing a new one if needed.
ErrorCode increment_record(thread::Thread *context, ArrayOffset offset, T *value, uint16_t payload_offset)
DualPagePointer root_page_pointer_
Points to the root page (or something equivalent).
ErrorCode get_record(thread::Thread *context, ArrayOffset offset, void *payload)
Retrieves one record of the given offset in this array storage.
If you want more than this, you should loop.
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
Definition: thread.cpp:78
ErrorCode overwrite_record(thread::Thread *context, ArrayOffset offset, const void *payload, uint16_t payload_offset, uint16_t payload_count) __attribute__((always_inline))
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
ArrayMetadata meta_
metadata of this storage.
0 means no-error.
Definition: error_code.hpp:87
ErrorCode lookup_for_read(thread::Thread *context, ArrayOffset offset, ArrayPage **out, uint16_t *index, bool *snapshot_page) __attribute__((always_inline))
Log type of array-storage's overwrite operation.
bool is_moved() const __attribute__((always_inline))
Definition: xct_id.hpp:1142
const DualPagePointer & get_interior_record(uint16_t record) const __attribute__((always_inline))
const ArrayRange & get_array_range() const
char payload_[8]
Arbitrary payload given by the user.
Definition: record.hpp:45
ArrayStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
const Record * get_leaf_record(uint16_t record, uint16_t payload_size) const __attribute__((always_inline))
#define CHECK_AND_ASSERT(x)
bool is_being_written() const __attribute__((always_inline))
Definition: xct_id.hpp:1145
A helper class to return a bunch of pages to individual nodes.
Definition: page_pool.hpp:276
Calls Initializable::uninitialize() automatically when it gets out of scope.
Constants and methods related to CPU cacheline and its prefetching.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
ArrayStorageControlBlock *const control_block_
ErrorCode follow_page_pointers_for_read_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
Batched version of follow_page_pointer with will_modify==false.
0x0802 : "STORAGE: This storage already exists" .
Definition: error_code.hpp:168
ErrorCode get_record_for_write_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, Record **record_batch) __attribute__((always_inline))
static uint16_t calculate_log_length(ValueType value_type) __attribute__((always_inline))
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
ErrorCode get_record_payload(thread::Thread *context, ArrayOffset offset, const void **payload)
Retrieves a pointer to the entire payload.
LookupRoute find_route(ArrayOffset offset) const __attribute__((always_inline))
void populate(StorageId storage_id, ArrayOffset offset, const void *payload, uint16_t payload_offset, uint16_t payload_count) __attribute__((always_inline))
ErrorCode follow_pointers_for_read_batch(thread::Thread *context, uint16_t batch_size, bool *in_snapshot, ArrayPage **parents, const uint16_t *index_in_parents, ArrayPage **out)
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
Metadata of an array storage.
static std::vector< uint64_t > calculate_required_pages(uint64_t array_size, uint16_t payload)
Calculate leaf/interior pages we need.
Log type of array-storage's increment operation.
ErrorStack grab_one_volatile_page(foedus::thread::ThreadGroupId node, storage::VolatilePagePointer *pointer, storage::Page **page)
A convenience function to grab one free volatile page from the given node.
ErrorStack hcc_reset_all_temperature_stat()
Resets all volatile pages' temperature stat to be zero in this storage.
static void release_pages_recursive(const memory::GlobalVolatilePageResolver &resolver, memory::PageReleaseBatch *batch, VolatilePagePointer volatile_page_id)
Used only from drop()
bool contains(ArrayOffset offset) const
Definition: array_id.hpp:79
static std::vector< uint64_t > calculate_offset_intervals(uint8_t levels, uint16_t payload)
The offset interval a single page represents in each level.
void populate(StorageId storage_id, ArrayOffset offset, T payload, uint16_t payload_offset) __attribute__((always_inline))
uint8_t get_levels() const
Returns the number of levels.
ErrorCode get_record_for_write(thread::Thread *context, ArrayOffset offset, Record **record)
Retrieves a pointer to the entire record for write (thus always in volatile page).
ErrorCode increment_record_oneshot(thread::Thread *context, ArrayOffset offset, T value, uint16_t payload_offset)
This is a faster increment that does not return the value after increment.
ErrorCode lookup_for_write_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, Record **record_batch)
ErrorStack load(const StorageControlBlock &snapshot_block)
ErrorCode get_record_payload_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, const void **payload_batch)
Represents an offset range in an array storage.
Definition: array_id.hpp:62
ErrorStack hcc_reset_all_temperature_stat_intermediate(VolatilePagePointer intermediate_page_id)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
Packages logic and required properties to calculate LookupRoute in array storage from offset...
Definition: array_route.hpp:86
ErrorCode get_record_primitive(thread::Thread *context, ArrayOffset offset, T *payload, uint16_t payload_offset)
Retrieves a part of record of the given offset as a primitive type in this array storage.
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
Definition: xct.hpp:149
uint16_t get_records_in_leaf() const __attribute__((always_inline))
ValueType
Used in ArrayIncrementLogType.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
void release(storage::VolatilePagePointer page_id)
Returns the given in-memory volatile page to appropriate NUMA node.
Definition: page_pool.hpp:292
VolatilePagePointer construct_volatile_page_pointer(uint64_t word)
Definition: storage_id.hpp:230
const ErrorStack kRetOk
Normal return value for no-error case.
uint8_t calculate_levels(const ArrayMetadata &metadata)
ErrorCode follow_pointer(thread::Thread *context, bool in_snapshot, bool for_write, DualPagePointer *pointer, ArrayPage **out, const ArrayPage *parent, uint16_t index_in_parent) __attribute__((always_inline))
ErrorCode increment_record_oneshot(thread::Thread *context, ArrayOffset offset, T value, uint16_t payload_offset)
int64_t int_div_ceil(int64_t dividee, int64_t dividor)
Efficient ceil(dividee/dividor) for integer.
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
No guarantee at all for reads, for the sake of best performance and scalability.
Definition: xct_id.hpp:65
#define INSTANTIATE_ALL_NUMERIC_TYPES(M)
INSTANTIATE_ALL_TYPES minus std::string.
Definitions of IDs in this package and a few related constant values.
ErrorCode get_record_primitive_batch(thread::Thread *context, uint16_t payload_offset, uint16_t batch_size, const ArrayOffset *offset_batch, T *payload_batch) __attribute__((always_inline))
assorted::ProbCounter hotness_
Loosely maintained statistics on data temperature.
Definition: page.hpp:268
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
Definition: array_id.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode get_record_payload_batch(thread::Thread *context, uint16_t batch_size, const ArrayOffset *offset_batch, const void **payload_batch) __attribute__((always_inline))
ErrorStack create(const Metadata &metadata)
A base layout of shared data for all storage types.
Definition: storage.hpp:53
StorageId id_
the unique ID of this storage.
Definition: metadata.hpp:103
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
uint16_t payload_size_
byte size of one record in this array storage without internal overheads
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
xct::RwLockableXctId owner_id_
This indicates the transaction that most recently modified this record.
Definition: record.hpp:39
ErrorCode overwrite_record_primitive(thread::Thread *context, ArrayOffset offset, T payload, uint16_t payload_offset)
bool is_keylocked() const __attribute__((always_inline))
Definition: xct_id.hpp:1140
ErrorCode overwrite_record_primitive(thread::Thread *context, ArrayOffset offset, T payload, uint16_t payload_offset)
Overwrites a part of record of the given offset as a primitive type in this array storage...
thread::ThreadId get_tail_waiter() const
Definition: xct_id.hpp:818
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112
void initialize_volatile_page(Epoch initial_epoch, StorageId storage_id, VolatilePagePointer page_id, uint16_t payload_size, uint8_t level, const ArrayRange &array_range)