libfoedus-core
FOEDUS Core Library
array_composer_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 #include <ostream>
25 #include <string>
26 #include <vector>
27 
28 #include "foedus/engine.hpp"
49 
50 namespace foedus {
51 namespace storage {
52 namespace array {
53 
60  : engine_(parent->get_engine()),
61  storage_id_(parent->get_storage_id()),
62  storage_(engine_, storage_id_) {
63  ASSERT_ND(storage_.exists());
64 }
65 
67  VLOG(0) << to_string() << " composing with " << args.log_streams_count_ << " streams.";
68  debugging::StopWatch stop_watch;
69 
70  snapshot::MergeSort merge_sort(
71  storage_id_,
73  args.base_epoch_,
74  args.log_streams_,
75  args.log_streams_count_,
76  kMaxLevels,
77  args.work_memory_);
78  CHECK_ERROR(merge_sort.initialize());
79 
80  ArrayComposeContext context(
81  engine_,
82  &merge_sort,
83  args.snapshot_writer_,
85  args.root_info_page_);
86  CHECK_ERROR(context.execute());
87 
88  CHECK_ERROR(merge_sort.uninitialize()); // no need for scoped release. its destructor is safe.
89 
90  stop_watch.stop();
91  LOG(INFO) << to_string() << " done in " << stop_watch.elapsed_ms() << "ms.";
92  return kRetOk;
93 }
94 
96  // compose() created root_info_pages that contain pointers to fill in the root page,
97  // so we just find non-zero entry and copy it to root page.
98  uint8_t levels = storage_.get_levels();
99  uint16_t payload_size = storage_.get_payload_size();
100  snapshot::SnapshotId new_snapshot_id = args.snapshot_writer_->get_snapshot_id();
101  Epoch system_initial_epoch = engine_->get_savepoint_manager()->get_initial_durable_epoch();
102  if (levels == 1U) {
103  // if it's single-page array, we have already created the root page in compose().
104  ASSERT_ND(args.root_info_pages_count_ == 1U);
105  const ArrayRootInfoPage* casted
106  = reinterpret_cast<const ArrayRootInfoPage*>(args.root_info_pages_[0]);
107  ASSERT_ND(casted->pointers_[0] != 0);
108  *args.new_root_page_pointer_ = casted->pointers_[0];
109 
110  // and we have already installed it, right?
111  ASSERT_ND(storage_.get_control_block()->meta_.root_snapshot_page_id_
112  == casted->pointers_[0]);
113  ASSERT_ND(storage_.get_control_block()->root_page_pointer_.snapshot_pointer_
114  == casted->pointers_[0]);
115  } else {
116  ArrayPage* root_page = reinterpret_cast<ArrayPage*>(args.snapshot_writer_->get_page_base());
119  *args.new_root_page_pointer_ = new_page_id;
120 
121  uint64_t root_interval = LookupRouteFinder(levels, payload_size).get_records_in_leaf();
122  for (uint8_t level = 1; level < levels; ++level) {
123  root_interval *= kInteriorFanout;
124  }
125  ArrayRange range(0, root_interval, storage_.get_array_size());
126  if (page_id != 0) {
127  WRAP_ERROR_CODE(args.previous_snapshot_files_->read_page(page_id, root_page));
128  ASSERT_ND(root_page->header().storage_id_ == storage_id_);
129  ASSERT_ND(root_page->header().page_id_ == page_id);
130  ASSERT_ND(root_page->get_array_range() == range);
131  root_page->header().page_id_ = new_page_id;
132  } else {
133  root_page->initialize_snapshot_page(
134  system_initial_epoch,
135  storage_id_,
136  new_page_id,
137  payload_size,
138  levels - 1,
139  range);
140  }
141 
142  uint64_t child_interval = root_interval / kInteriorFanout;
143  uint16_t root_children = assorted::int_div_ceil(storage_.get_array_size(), child_interval);
144 
145  // overwrite pointers with root_info_pages.
146  for (uint32_t i = 0; i < args.root_info_pages_count_; ++i) {
147  const ArrayRootInfoPage* casted
148  = reinterpret_cast<const ArrayRootInfoPage*>(args.root_info_pages_[i]);
149  for (uint16_t j = 0; j < root_children; ++j) {
150  SnapshotPagePointer pointer = casted->pointers_[j];
151  if (pointer != 0) {
152  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(pointer) == new_snapshot_id);
153  DualPagePointer& record = root_page->get_interior_record(j);
154  // partitioning has no overlap, so this must be the only overwriting pointer
155  ASSERT_ND(record.snapshot_pointer_ == 0 ||
157  != new_snapshot_id);
158  record.snapshot_pointer_ = pointer;
159  }
160  }
161  for (uint16_t j = root_children; j < kInteriorFanout; ++j) {
162  ASSERT_ND(casted->pointers_[j] == 0);
163  }
164  }
165 
166  // even in initial snapshot, all pointers must be set because we create empty pages
167  // even if some sub-tree receives no logs.
168  for (uint16_t j = 0; j < root_children; ++j) {
169  ASSERT_ND(root_page->get_interior_record(j).snapshot_pointer_ != 0);
170  }
171 
173  ASSERT_ND(args.snapshot_writer_->get_next_page_id() == new_page_id + 1ULL);
174  // AFTER writing out the root page, install the pointer to new root page
175  storage_.get_control_block()->root_page_pointer_.snapshot_pointer_ = new_page_id;
176  storage_.get_control_block()->meta_.root_snapshot_page_id_ = new_page_id;
177  }
178  return kRetOk;
179 }
180 
181 
182 std::string ArrayComposer::to_string() const {
183  return std::string("ArrayComposer-") + std::to_string(storage_id_);
184 }
185 
192  Engine* engine,
193  snapshot::MergeSort* merge_sort,
194  snapshot::SnapshotWriter* snapshot_writer,
195  cache::SnapshotFileSet* previous_snapshot_files,
196  Page* root_info_page)
197  : engine_(engine),
198  merge_sort_(merge_sort),
199  system_initial_epoch_(engine->get_savepoint_manager()->get_initial_durable_epoch()),
200  storage_id_(merge_sort_->get_storage_id()),
201  snapshot_id_(snapshot_writer->get_snapshot_id()),
202  storage_(engine, storage_id_),
203  snapshot_writer_(snapshot_writer),
204  previous_snapshot_files_(previous_snapshot_files),
205  root_info_page_(reinterpret_cast<ArrayRootInfoPage*>(root_info_page)),
206  payload_size_(storage_.get_payload_size()),
207  levels_(storage_.get_levels()),
208  previous_root_page_pointer_(storage_.get_metadata()->root_snapshot_page_id_) {
209  LookupRouteFinder route_finder(levels_, payload_size_);
210  offset_intervals_[0] = route_finder.get_records_in_leaf();
211  for (uint8_t level = 1; level < levels_; ++level) {
212  offset_intervals_[level] = offset_intervals_[level - 1] * kInteriorFanout;
213  }
214  std::memset(cur_path_, 0, sizeof(cur_path_));
215 
216  allocated_pages_ = 0;
217  allocated_intermediates_ = 0;
218  page_base_ = reinterpret_cast<ArrayPage*>(snapshot_writer_->get_page_base());
219  max_pages_ = snapshot_writer_->get_page_size();
220  intermediate_base_ = reinterpret_cast<ArrayPage*>(snapshot_writer_->get_intermediate_base());
221  max_intermediates_ = snapshot_writer_->get_intermediate_size();
222 
223  PartitionerMetadata* metadata = PartitionerMetadata::get_metadata(engine_, storage_id_);
224  ASSERT_ND(metadata->valid_); // otherwise composer invoked without partitioner. maybe testcase?
225  partitioning_data_ = reinterpret_cast<ArrayPartitionerData*>(metadata->locate_data(engine_));
226  ASSERT_ND(levels_ == partitioning_data_->array_levels_);
227  ASSERT_ND(storage_.get_array_size() == partitioning_data_->array_size_);
228 }
229 
231  std::memset(root_info_page_, 0, kPageSize);
232  root_info_page_->header_.storage_id_ = storage_id_;
233 
234  if (levels_ <= 1U) {
235  // this storage has only one page. This is very special and trivial.
236  // we process this case separately.
237  return execute_single_level_array();
238  }
239 
240  // This implementation is batch-based to make it significantly more efficient.
241  // We receive a fully-sorted/integrated stream of logs from merge_sort_, and merge them with
242  // previous snapshot on page-basis. Most pages have many logs, so this achieves a tight loop
243  // without expensive cost to switch pages.
244  bool processed_any = false;
245  while (true) {
246  CHECK_ERROR(merge_sort_->next_batch());
247  uint64_t count = merge_sort_->get_current_count();
248  if (count == 0 && merge_sort_->is_ended_all()) {
249  break;
250  }
251  const snapshot::MergeSort::SortEntry* sort_entries = merge_sort_->get_sort_entries();
252  if (!processed_any) {
253  // this is the first log. let's initialize cur_path to this log.
254  processed_any = true;
255  ArrayOffset initial_offset = sort_entries[0].get_key();
256 
257  CHECK_ERROR(initialize(initial_offset));
258  }
259 
260  uint64_t cur = 0;
261  while (cur < count) {
262  const ArrayCommonUpdateLogType* head = reinterpret_cast<const ArrayCommonUpdateLogType*>(
263  merge_sort_->resolve_sort_position(cur));
264  ArrayOffset head_offset = head->offset_;
265  ASSERT_ND(head_offset == sort_entries[cur].get_key());
266  // switch to a page containing this offset
267  WRAP_ERROR_CODE(update_cur_path(head_offset));
268  ArrayRange page_range = cur_path_[0]->get_array_range();
269  ASSERT_ND(page_range.contains(head_offset));
270 
271  // grab a range of logs that are in the same page.
272  uint64_t next;
273  for (next = cur + 1U; LIKELY(next < count); ++next) {
274  // this check uses sort_entries which are nicely contiguous.
275  ArrayOffset offset = sort_entries[next].get_key();
276  ASSERT_ND(offset >= page_range.begin_);
277  if (UNLIKELY(offset >= page_range.end_)) {
278  break;
279  }
280  }
281 
282  apply_batch(cur, next);
283  cur = next;
284  }
285  ASSERT_ND(cur == count);
286  }
287 
288  if (processed_any) {
289  CHECK_ERROR(finalize());
290  } else {
291  LOG(ERROR) << "wtf? no logs? storage-" << storage_id_;
292  }
293 
294 #ifndef NDEBUG
295  uint16_t children = get_root_children();
296  PartitionId partition = snapshot_writer_->get_numa_node();
297  ASSERT_ND(partitioning_data_);
298  for (uint16_t i = 0; i < children; ++i) {
299  if (!partitioning_data_->partitionable_ || partitioning_data_->bucket_owners_[i] == partition) {
300  ASSERT_ND(root_info_page_->pointers_[i] != 0);
301  } else {
302  ASSERT_ND((!is_initial_snapshot() && root_info_page_->pointers_[i] != 0)
303  || (is_initial_snapshot() && root_info_page_->pointers_[i] == 0));
304  }
305  }
306  for (uint16_t i = children; i < kInteriorFanout; ++i) {
307  ASSERT_ND(root_info_page_->pointers_[i] == 0);
308  }
309 #endif // NDEBUG
310 
311  return kRetOk;
312 }
313 
314 void ArrayComposeContext::apply_batch(uint64_t cur, uint64_t next) {
315  const uint16_t kFetchSize = 8;
316  const log::RecordLogType* logs[kFetchSize];
317  ArrayPage* leaf = cur_path_[0];
318  ArrayRange range = leaf->get_array_range();
319  while (cur < next) {
320  uint16_t desired = std::min<uint16_t>(kFetchSize, next - cur);
321  uint16_t fetched = merge_sort_->fetch_logs(cur, desired, logs);
322  for (uint16_t i = 0; i < kFetchSize && LIKELY(i < fetched); ++i) {
323  const ArrayCommonUpdateLogType* log
324  = reinterpret_cast<const ArrayCommonUpdateLogType*>(logs[i]);
325  ASSERT_ND(range.contains(log->offset_));
326  uint16_t index = log->offset_ - range.begin_;
327  Record* record = leaf->get_leaf_record(index, payload_size_);
329  const ArrayOverwriteLogType* casted
330  = reinterpret_cast<const ArrayOverwriteLogType*>(log);
331  casted->apply_record(nullptr, storage_id_, &record->owner_id_, record->payload_);
332  } else {
334  const ArrayIncrementLogType* casted
335  = reinterpret_cast<const ArrayIncrementLogType*>(log);
336  casted->apply_record(nullptr, storage_id_, &record->owner_id_, record->payload_);
337  }
338  }
339  cur += fetched;
340  ASSERT_ND(cur <= next);
341  }
342 }
343 
344 ErrorStack ArrayComposeContext::execute_single_level_array() {
345  // no page-switch in this case
346  ArrayRange range(0, storage_.get_array_size());
347  // single-page array. root is a leaf page.
348  cur_path_[0] = page_base_;
349  SnapshotPagePointer page_id = snapshot_writer_->get_next_page_id();
350  ASSERT_ND(allocated_pages_ == 0);
351  allocated_pages_ = 1;
352  WRAP_ERROR_CODE(read_or_init_page(previous_root_page_pointer_, page_id, 0, range, cur_path_[0]));
353 
354  while (true) {
355  CHECK_ERROR(merge_sort_->next_batch());
356  uint64_t count = merge_sort_->get_current_count();
357  if (count == 0 && merge_sort_->is_ended_all()) {
358  break;
359  }
360 
361  apply_batch(0, count);
362  }
363 
364  ASSERT_ND(allocated_pages_ == 1U);
365  ASSERT_ND(allocated_intermediates_ == 0);
366  ASSERT_ND(cur_path_[0] == page_base_);
367  ASSERT_ND(page_id == page_base_[0].header().page_id_);
368  ASSERT_ND(snapshot_writer_->get_next_page_id() == page_id);
369  WRAP_ERROR_CODE(snapshot_writer_->dump_pages(0, 1));
370  root_info_page_->pointers_[0] = page_id;
371 
372  // further, we install the only snapshot pointer now.
373  storage_.get_control_block()->meta_.root_snapshot_page_id_ = page_id;
374  storage_.get_control_block()->root_page_pointer_.snapshot_pointer_ = page_id;
375  return kRetOk;
376 }
377 
378 uint16_t ArrayComposeContext::get_root_children() const {
379  uint64_t child_interval = offset_intervals_[levels_ - 2U];
380  return assorted::int_div_ceil(storage_.get_array_size(), child_interval);
381 }
382 
383 ErrorStack ArrayComposeContext::finalize() {
384  ASSERT_ND(levels_ > 1U);
385 
386  ArrayRange last_range = cur_path_[0]->get_array_range();
387  if (is_initial_snapshot() && last_range.end_ < storage_.get_array_size()) {
388  VLOG(0) << "Need to fill out empty pages in initial snapshot of array-" << storage_id_
389  << ", from " << last_range.end_ << " to the end of array";
390  WRAP_ERROR_CODE(create_empty_pages(last_range.end_, storage_.get_array_size()));
391  }
392 
393  // flush the main buffer. now we finalized all leaf pages
394  if (allocated_pages_ > 0) {
395  WRAP_ERROR_CODE(dump_leaf_pages());
396  ASSERT_ND(allocated_pages_ == 0);
397  }
398 
399  // intermediate pages are different animals.
400  // we store them in a separate buffer, and now finally we can get their page IDs.
401  // Until now, we used relative indexes in intermediate buffer as page ID, storing them in
402  // page ID header. now let's convert all of them to be final page ID.
403  ArrayPage* root_page = intermediate_base_;
404  ASSERT_ND(root_page == cur_path_[levels_ - 1]);
405  ASSERT_ND(root_page->get_level() == levels_ - 1);
406  ASSERT_ND(root_page->header().page_id_ == 0); // this is the only page that has page-id 0
407 
408  // base_pointer + offset in intermediate buffer will be the new page ID.
409  const SnapshotPagePointer base_pointer = snapshot_writer_->get_next_page_id();
410  root_page->header().page_id_ = base_pointer;
411  for (uint32_t i = 1; i < allocated_intermediates_; ++i) {
412  SnapshotPagePointer new_page_id = base_pointer + i;
413  ArrayPage* page = intermediate_base_ + i;
414  ASSERT_ND(page->header().page_id_ == i);
415  ASSERT_ND(page->get_level() > 0);
416  ASSERT_ND(page->get_level() < levels_ - 1U);
417  page->header().page_id_ = new_page_id;
418  if (page->get_level() > 1U) {
419  // also updates pointers to new children.
420  // we can tell whether the pointer is created during this snapshot by seeing the page ID.
421  // we used the relative index (1~allocated_intermediates_-1) as pointer, which means
422  // they have 0 (kNullSnapshotId) as snapshot ID. Thus, if there is a non-null pointer whose
423  // snapshot-Id is 0, that's a pointer we have created here.
424  for (uint16_t j = 0; j < kInteriorFanout; ++j) {
425  DualPagePointer& pointer = page->get_interior_record(j);
426  ASSERT_ND(pointer.volatile_pointer_.is_null());
427  SnapshotPagePointer page_id = pointer.snapshot_pointer_;
429  ASSERT_ND(snapshot_id != snapshot_id_);
430  if (page_id != 0 && snapshot_id == snapshot::kNullSnapshotId) {
432  ASSERT_ND(page_id < allocated_intermediates_);
433  pointer.snapshot_pointer_ = base_pointer + page_id;
434  ASSERT_ND(verify_snapshot_pointer(pointer.snapshot_pointer_));
435  }
436  }
437  }
438  }
439 
440  // we also write out root page, but we don't use it as we just put an equivalent information to
441  // root_info_page. construct_root() will combine all composers' output later.
442  snapshot_writer_->dump_intermediates(0, allocated_intermediates_);
443 
444  const uint16_t root_children = get_root_children();
445  const PartitionId partition = snapshot_writer_->get_numa_node();
446  ASSERT_ND(partitioning_data_);
447  for (uint16_t j = 0; j < root_children; ++j) {
448  DualPagePointer& pointer = root_page->get_interior_record(j);
449  ASSERT_ND(pointer.volatile_pointer_.is_null());
450  SnapshotPagePointer page_id = pointer.snapshot_pointer_;
452 
453  if (!partitioning_data_->partitionable_ || partitioning_data_->bucket_owners_[j] == partition) {
454  ASSERT_ND(page_id != 0);
455  // okay, this is a page this node is responsible for.
456  if (snapshot_id == snapshot_id_) {
457  // we already have snapshot pointers because it points to leaf pages. (2 level array)
458  // the pointer is already valid as a snapshot pointer
460  == snapshot_writer_->get_numa_node());
461  ASSERT_ND(root_page->get_level() == 1U);
462  ASSERT_ND(verify_snapshot_pointer(pointer.snapshot_pointer_));
463  } else if (snapshot_id == snapshot::kNullSnapshotId) {
464  // intermediate pages created in this snapshot.
465  // just like other pages adjusted above, it's an offset from intermediate_base_
466  ASSERT_ND(root_page->get_level() > 1U);
468  ASSERT_ND(page_id < allocated_intermediates_);
469  pointer.snapshot_pointer_ = base_pointer + page_id;
470  ASSERT_ND(verify_snapshot_pointer(pointer.snapshot_pointer_));
471  } else {
472  // then, it's a page in previous snapshots we didn't modify
473  ASSERT_ND(!is_initial_snapshot());
474  ASSERT_ND(snapshot_id != snapshot_id_);
475  }
476  root_info_page_->pointers_[j] = pointer.snapshot_pointer_;
477  } else {
478  ASSERT_ND((!is_initial_snapshot() && page_id != 0 && snapshot_id != snapshot_id_)
479  || (is_initial_snapshot() && page_id == 0));
480  }
481  }
482  for (uint16_t j = root_children; j < kInteriorFanout; ++j) {
483  ASSERT_ND(root_page->get_interior_record(j).is_both_null());
484  }
485 
486 
487  // AFTER durably writing out the intermediate pages to the file, we install snapshot pointers.
488  uint64_t installed_count = 0;
489  CHECK_ERROR(install_snapshot_pointers(base_pointer, &installed_count));
490 
491  return kRetOk;
492 }
493 
494 ErrorStack ArrayComposeContext::initialize(ArrayOffset initial_offset) {
495  ASSERT_ND(allocated_intermediates_ == 0);
496  ASSERT_ND(allocated_pages_ == 0);
497  ASSERT_ND(levels_ > 1U);
498 
499  // First, load or create the root page.
500  CHECK_ERROR(init_root_page());
501 
502  // If an initial snapshot, we have to create empty pages first.
503  if (is_initial_snapshot()) {
504  ArrayRange leaf_range = to_leaf_range(initial_offset);
505  VLOG(0) << "Need to fill out empty pages in initial snapshot of array-" << storage_id_
506  << ", upto " << leaf_range.end_;
507  WRAP_ERROR_CODE(create_empty_pages(0, leaf_range.end_));
508  ASSERT_ND(cur_path_[0]);
509  ASSERT_ND(cur_path_[0]->get_array_range() == leaf_range);
510  }
511  return kRetOk;
512 }
513 
514 ErrorStack ArrayComposeContext::init_root_page() {
515  uint8_t level = levels_ - 1U;
516  ASSERT_ND(level > 0);
517  ArrayPage* page = intermediate_base_;
518  ArrayRange range(0, storage_.get_array_size());
519  ASSERT_ND(allocated_intermediates_ == 0);
520  allocated_intermediates_ = 1;
521 
522  WRAP_ERROR_CODE(read_or_init_page(previous_root_page_pointer_, 0, level, range, page));
523  cur_path_[level] = page;
524  return kRetOk;
525 }
526 
527 ErrorCode ArrayComposeContext::create_empty_pages(ArrayOffset from, ArrayOffset to) {
528  ASSERT_ND(is_initial_snapshot()); // this must be called only at initial snapshot
529  ASSERT_ND(levels_ > 1U); // single-page array is handled separately, and no need for this func.
530  ASSERT_ND(from < to);
531  ASSERT_ND(to <= storage_.get_array_size());
532  ArrayPage* page = cur_path_[levels_ - 1U];
533  ASSERT_ND(page);
534 
535  // This composer only takes care of partition-<node>. We must fill empty pages only in
536  // the assigned subtrees.
537  PartitionId partition = snapshot_writer_->get_numa_node();
538  ASSERT_ND(partitioning_data_);
539  ASSERT_ND(!partitioning_data_->partitionable_
540  || partitioning_data_->bucket_size_ == offset_intervals_[levels_ - 2U]);
541 
542  // basically same flow as create_empty_pages_recurse, but we skip other partitions.
543  // in lower levels, we don't have to worry about partitioning. The subtrees are solely ours.
544  const uint8_t child_level = levels_ - 2U;
545  const uint64_t interval = offset_intervals_[child_level];
546  const uint16_t first_child = from / interval; // floor
547 
548  // the followings are ceil because right-most might be partial.
549  uint16_t children = assorted::int_div_ceil(storage_.get_array_size(), interval);
550  if (interval * children > to) {
551  children = assorted::int_div_ceil(to, interval);
552  }
553  ASSERT_ND(children <= kInteriorFanout);
554 
555  for (uint16_t i = first_child; i < children; ++i) {
556  if (partitioning_data_->partitionable_ && partitioning_data_->bucket_owners_[i] != partition) {
557  continue;
558  }
559 
560  // are we filling out empty pages in order?
561  ASSERT_ND(i == first_child || page->get_interior_record(i).snapshot_pointer_ == 0);
562 
563  ArrayRange child_range(i * interval, (i + 1U) * interval, storage_.get_array_size());
564  if (page->get_interior_record(i).snapshot_pointer_ == 0) {
565  if (child_level > 0) {
566  CHECK_ERROR_CODE(create_empty_intermediate_page(page, i, child_range));
568  cur_path_[child_level]->header().page_id_) == snapshot::kNullSnapshotId);
569  } else {
570  CHECK_ERROR_CODE(create_empty_leaf_page(page, i, child_range));
572  cur_path_[child_level]->header().page_id_) == snapshot_id_);
573  }
574  }
575 
576  ASSERT_ND(cur_path_[child_level]);
577  page->get_interior_record(i).snapshot_pointer_ = cur_path_[child_level]->header().page_id_;
578  if (child_level > 0) {
579  CHECK_ERROR_CODE(create_empty_pages_recurse(from, to, cur_path_[child_level]));
580  }
581  }
582  return kErrorCodeOk;
583 }
584 
585 ErrorCode ArrayComposeContext::create_empty_pages_recurse(
586  ArrayOffset from,
587  ArrayOffset to,
588  ArrayPage* page) {
589  const uint8_t cur_level = page->get_level();
590  ASSERT_ND(cur_level > 0);
591  ArrayRange page_range = page->get_array_range();
592  ASSERT_ND(page_range.begin_ < to);
593  ASSERT_ND(from < page_range.end_);
594 
595  const uint8_t child_level = cur_level - 1U;
596  const uint64_t interval = offset_intervals_[child_level];
597  ASSERT_ND(page_range.end_ == page_range.begin_ + interval * kInteriorFanout
598  || (page_range.begin_ + interval * kInteriorFanout > storage_.get_array_size()
599  && page_range.end_ == storage_.get_array_size()));
600 
601  uint16_t first_child = 0;
602  if (from > page_range.begin_) {
603  ASSERT_ND(from < page_range.begin_ + interval * kInteriorFanout);
604  first_child = (from - page_range.begin_) / interval; // floor. left-most might be partial.
605  }
606 
607  // the followings are ceil because right-most might be partial.
608  uint16_t children = assorted::int_div_ceil(page_range.end_ - page_range.begin_, interval);
609  if (page_range.begin_ + interval * children > to) {
610  children = assorted::int_div_ceil(to - page_range.begin_, interval);
611  }
612  ASSERT_ND(children <= kInteriorFanout);
613 
614  // we assume this method is called in order, thus null-pointer means the pages we should fill out.
615  for (uint16_t i = first_child; i < children; ++i) {
616  ASSERT_ND(i == first_child || page->get_interior_record(i).snapshot_pointer_ == 0);
617  ArrayRange child_range(
618  page_range.begin_ + i * interval,
619  page_range.begin_ + (i + 1U) * interval,
620  page_range.end_);
621 
622  if (page->get_interior_record(i).snapshot_pointer_ == 0) {
623  if (child_level > 0) {
624  CHECK_ERROR_CODE(create_empty_intermediate_page(page, i, child_range));
626  cur_path_[child_level]->header().page_id_) == snapshot::kNullSnapshotId);
627  } else {
628  CHECK_ERROR_CODE(create_empty_leaf_page(page, i, child_range));
630  cur_path_[child_level]->header().page_id_) == snapshot_id_);
631  }
632  }
633 
634  ASSERT_ND(cur_path_[child_level]);
635  page->get_interior_record(i).snapshot_pointer_ = cur_path_[child_level]->header().page_id_;
636  if (child_level > 0) {
637  CHECK_ERROR_CODE(create_empty_pages_recurse(from, to, cur_path_[child_level]));
638  }
639  }
640 
641  return kErrorCodeOk;
642 }
643 
644 ErrorCode ArrayComposeContext::dump_leaf_pages() {
645  CHECK_ERROR_CODE(snapshot_writer_->dump_pages(0, allocated_pages_));
646  ASSERT_ND(snapshot_writer_->get_next_page_id()
647  == page_base_[0].header().page_id_ + allocated_pages_);
648  ASSERT_ND(snapshot_writer_->get_next_page_id()
649  == page_base_[allocated_pages_ - 1].header().page_id_ + 1ULL);
650  allocated_pages_ = 0;
651  return kErrorCodeOk;
652 }
653 
654 ErrorCode ArrayComposeContext::create_empty_intermediate_page(
655  ArrayPage* parent,
656  uint16_t index,
657  ArrayRange range) {
658  ASSERT_ND(parent->get_level() > 1U);
659  DualPagePointer& pointer = parent->get_interior_record(index);
660  ASSERT_ND(pointer.is_both_null());
661  uint8_t level = parent->get_level() - 1U;
662  CHECK_ERROR_CODE(expand_intermediate_pool_if_needed());
663  ArrayPage* page = intermediate_base_ + allocated_intermediates_;
664  SnapshotPagePointer new_page_id = allocated_intermediates_;
665  ++allocated_intermediates_;
666  CHECK_ERROR_CODE(read_or_init_page(0, new_page_id, level, range, page));
667 
668  cur_path_[level] = page;
669  return kErrorCodeOk;
670 }
671 
672 ErrorCode ArrayComposeContext::create_empty_leaf_page(
673  ArrayPage* parent,
674  uint16_t index,
675  ArrayRange range) {
676  ASSERT_ND(parent->get_level() == 1U);
677  DualPagePointer& pointer = parent->get_interior_record(index);
678  ASSERT_ND(pointer.is_both_null());
679  if (allocated_pages_ >= max_pages_) {
680  CHECK_ERROR_CODE(dump_leaf_pages());
681  ASSERT_ND(allocated_pages_ == 0);
682  }
683 
684  // remember, we can finalize the page ID of leaf pages at this point
685  ArrayPage* page = page_base_ + allocated_pages_;
686  SnapshotPagePointer new_page_id = snapshot_writer_->get_next_page_id() + allocated_pages_;
687  ASSERT_ND(verify_snapshot_pointer(new_page_id));
688  ++allocated_pages_;
689  CHECK_ERROR_CODE(read_or_init_page(0, new_page_id, 0, range, page));
690  pointer.snapshot_pointer_ = new_page_id;
691 
692  cur_path_[0] = page;
693  return kErrorCodeOk;
694 }
695 
696 inline ErrorCode ArrayComposeContext::expand_intermediate_pool_if_needed() {
697  ASSERT_ND(allocated_intermediates_ <= max_intermediates_);
698  if (UNLIKELY(allocated_intermediates_ == max_intermediates_)) {
699  LOG(INFO) << "Automatically expanding intermediate_pool. This should be a rare event";
700  uint32_t required = allocated_intermediates_ + 1U;
701  CHECK_ERROR_CODE(snapshot_writer_->expand_intermediate_memory(required, true));
702  intermediate_base_ = reinterpret_cast<ArrayPage*>(snapshot_writer_->get_intermediate_base());
703  max_intermediates_ = snapshot_writer_->get_intermediate_size();
704  }
705  return kErrorCodeOk;
706 }
707 
708 ErrorCode ArrayComposeContext::update_cur_path(ArrayOffset next_offset) {
709  ASSERT_ND(levels_ > 1U);
710  ASSERT_ND(cur_path_[0] == nullptr || next_offset >= cur_path_[0]->get_array_range().begin_);
711  if (cur_path_[0] != nullptr && next_offset < cur_path_[0]->get_array_range().end_) {
712  // already in the page. this usually doesn't happen as we batch-apply as many as possible,
713  // but might happen when logs for the same page are on a boundary of windows.
714  return kErrorCodeOk;
715  }
716 
717  ArrayRange next_range = to_leaf_range(next_offset);
718  ArrayOffset jump_from = cur_path_[0] == nullptr ? 0 : cur_path_[0]->get_array_range().end_;
719  ArrayOffset jump_to = next_range.begin_;
720  if (jump_to > jump_from && is_initial_snapshot()) {
721  VLOG(0) << "Need to fill out empty pages in initial snapshot of array-" << storage_id_
722  << ", from " << jump_from << " to " << jump_to;
723  CHECK_ERROR_CODE(create_empty_pages(jump_from, jump_to));
724  }
725 
726  // then switch pages. we might have to switch parent pages, too.
727  ASSERT_ND(cur_path_[levels_ - 1U]->get_array_range().contains(next_offset));
728  for (uint8_t level = levels_ - 2U; level < kMaxLevels; --level) { // note, unsigned.
729  // we don't care changes in route[0] (record ordinals in page)
730  if (cur_path_[level] != nullptr && cur_path_[level]->get_array_range().contains(next_offset)) {
731  // skip non-changed path. most likely only the leaf has changed.
732  continue;
733  }
734 
735  // page switched! we have to allocate a new page and point to it.
736  ArrayPage* parent = cur_path_[level + 1U];
737  ArrayRange parent_range = parent->get_array_range();
738  ASSERT_ND(parent_range.contains(next_offset));
739  uint64_t interval = offset_intervals_[level];
740  uint16_t i = (next_offset - parent_range.begin_) / interval;
741  ASSERT_ND(i < kInteriorFanout);
742 
743  ArrayRange child_range(
744  parent_range.begin_ + i * interval,
745  parent_range.begin_ + (i + 1U) * interval,
746  parent_range.end_);
747 
748  DualPagePointer& pointer = parent->get_interior_record(i);
749  ASSERT_ND(pointer.volatile_pointer_.is_null());
750  SnapshotPagePointer old_page_id = pointer.snapshot_pointer_;
751  ASSERT_ND((!is_initial_snapshot() && old_page_id != 0)
752  || (is_initial_snapshot() && old_page_id == 0));
753 
754  ArrayPage* page;
755  SnapshotPagePointer new_page_id;
756  if (level > 0U) {
757  // we switched an intermediate page
758  CHECK_ERROR_CODE(expand_intermediate_pool_if_needed());
759  page = intermediate_base_ + allocated_intermediates_;
760  new_page_id = allocated_intermediates_;
761  ++allocated_intermediates_;
762  CHECK_ERROR_CODE(read_or_init_page(old_page_id, new_page_id, level, child_range, page));
763  } else {
764  // we switched a leaf page. in this case, we might have to flush the buffer
765  if (allocated_pages_ >= max_pages_) {
766  CHECK_ERROR_CODE(dump_leaf_pages());
767  ASSERT_ND(allocated_pages_ == 0);
768  }
769 
770  // remember, we can finalize the page ID of leaf pages at this point
771  page = page_base_ + allocated_pages_;
772  new_page_id = snapshot_writer_->get_next_page_id() + allocated_pages_;
773  ASSERT_ND(verify_snapshot_pointer(new_page_id));
774  ++allocated_pages_;
775  CHECK_ERROR_CODE(read_or_init_page(old_page_id, new_page_id, level, child_range, page));
776  }
777  ASSERT_ND(page->header().page_id_ == new_page_id);
778  pointer.snapshot_pointer_ = new_page_id;
779  cur_path_[level] = page;
780  }
781  ASSERT_ND(verify_cur_path());
782  return kErrorCodeOk;
783 }
784 
785 inline ErrorCode ArrayComposeContext::read_or_init_page(
786  SnapshotPagePointer old_page_id,
787  SnapshotPagePointer new_page_id,
788  uint8_t level,
789  ArrayRange range,
790  ArrayPage* page) {
791  ASSERT_ND(new_page_id != 0 || level == levels_ - 1U);
792  if (old_page_id != 0) {
793  ASSERT_ND(!is_initial_snapshot());
794  CHECK_ERROR_CODE(previous_snapshot_files_->read_page(old_page_id, page));
795  ASSERT_ND(page->header().storage_id_ == storage_id_);
796  ASSERT_ND(page->header().page_id_ == old_page_id);
797  ASSERT_ND(page->get_level() == level);
798  ASSERT_ND(page->get_array_range() == range);
799  page->header().page_id_ = new_page_id;
800  } else {
801  ASSERT_ND(is_initial_snapshot());
802  page->initialize_snapshot_page(
803  system_initial_epoch_,
804  storage_id_,
805  new_page_id,
806  payload_size_,
807  level,
808  range);
809  }
810  return kErrorCodeOk;
811 }
812 
813 bool ArrayComposeContext::verify_cur_path() const {
814  for (uint8_t level = 0; level < kMaxLevels; ++level) {
815  if (level >= levels_) {
816  ASSERT_ND(cur_path_[level] == nullptr);
817  continue;
818  }
819  ASSERT_ND(cur_path_[level]);
820  ASSERT_ND(cur_path_[level]->get_level() == level);
821  ASSERT_ND(cur_path_[level]->get_storage_id() == storage_id_);
822  }
823  return true;
824 }
825 
826 bool ArrayComposeContext::verify_snapshot_pointer(SnapshotPagePointer pointer) {
828  if (!engine_->is_master()) {
830  == snapshot_writer_->get_numa_node());
831  }
833  == snapshot_writer_->get_snapshot_id());
834  return true;
835 }
836 
842 ErrorStack ArrayComposeContext::install_snapshot_pointers(
843  SnapshotPagePointer snapshot_base,
844  uint64_t* installed_count) const {
845  ASSERT_ND(levels_ > 1U); // no need to call this method in one-page array
846  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(snapshot_base) == snapshot_id_);
847 
848  *installed_count = 0;
849  VolatilePagePointer pointer = storage_.get_control_block()->root_page_pointer_.volatile_pointer_;
850  if (pointer.is_null()) {
851  VLOG(0) << "No volatile pages.. maybe while restart?";
852  return kRetOk;
853  }
854 
855  const memory::GlobalVolatilePageResolver& resolver
857  ArrayPage* volatile_root = reinterpret_cast<ArrayPage*>(resolver.resolve_offset(pointer));
858 
859  // compared to masstree, array is much easier to install snapshot pointers because the
860  // shape of the tree is exactly same between volatile and snapshot.
861  // we just recurse with the corresponding snapshot and volatile pages.
862  debugging::StopWatch watch;
863  const ArrayPage* snapshot_root = intermediate_base_;
864  WRAP_ERROR_CODE(install_snapshot_pointers_recurse(
865  snapshot_base,
866  resolver,
867  snapshot_root,
868  volatile_root,
869  installed_count));
870  watch.stop();
871  VLOG(0) << "ArrayStorage-" << storage_id_ << " installed " << *installed_count << " pointers"
872  << " in " << watch.elapsed_ms() << "ms";
873  return kRetOk;
874 }
875 
876 ErrorCode ArrayComposeContext::install_snapshot_pointers_recurse(
877  SnapshotPagePointer snapshot_base,
878  const memory::GlobalVolatilePageResolver& resolver,
879  const ArrayPage* snapshot_page,
880  ArrayPage* volatile_page,
881  uint64_t* installed_count) const {
882  ASSERT_ND(snapshot_page->get_array_range() == volatile_page->get_array_range());
883  ASSERT_ND(!snapshot_page->is_leaf());
884  ASSERT_ND(!volatile_page->is_leaf());
885  const bool needs_recursion = snapshot_page->get_level() > 1U;
886  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
887  SnapshotPagePointer pointer = snapshot_page->get_interior_record(i).snapshot_pointer_;
888  if (pointer == 0) {
889  continue; // either this is right-most page or the range is not in this partition
890  }
892  ASSERT_ND(snapshot_id != snapshot::kNullSnapshotId);
893  if (snapshot_id != snapshot_id_) {
894  continue;
895  }
897  == snapshot_writer_->get_numa_node());
898  DualPagePointer& target = volatile_page->get_interior_record(i);
899  target.snapshot_pointer_ = pointer;
900  ++(*installed_count);
901 
902  if (needs_recursion) {
903  ASSERT_ND(pointer > snapshot_base);
904  // if it has a volatile page, further recurse.
905  VolatilePagePointer volatile_pointer = target.volatile_pointer_;
906  if (!volatile_pointer.is_null()) {
907  ArrayPage* volatile_next
908  = reinterpret_cast<ArrayPage*>(resolver.resolve_offset(volatile_pointer));
909  uint64_t offset = pointer - snapshot_base;
910  const ArrayPage* snapshot_next = intermediate_base_ + offset;
911  CHECK_ERROR_CODE(install_snapshot_pointers_recurse(
912  snapshot_base,
913  resolver,
914  snapshot_next,
915  volatile_next,
916  installed_count));
917  }
918  }
919  }
920  return kErrorCodeOk;
921 }
922 
923 
930  Composer::DropResult result(args);
931  if (storage_.get_array_metadata()->keeps_all_volatile_pages()) {
932  LOG(INFO) << "Keep-all-volatile: Storage-" << storage_.get_name()
933  << " is configured to keep all volatile pages.";
934  result.dropped_all_ = false;
935  return result;
936  }
937 
938  DualPagePointer* root_pointer = &storage_.get_control_block()->root_page_pointer_;
939  ArrayPage* volatile_page = resolve_volatile(root_pointer->volatile_pointer_);
940  if (volatile_page == nullptr) {
941  LOG(INFO) << "No volatile root page. Probably while restart";
942  return result;
943  }
944 
945  // single-page array has only the root page. nothing to do here.
946  // we might drop the root page later, just like non-single-page cases.
947  if (volatile_page->is_leaf()) {
948  LOG(INFO) << "Single-page array skipped by .";
949  return result;
950  }
951 
952  // We iterate through all existing volatile pages to drop volatile pages of
953  // level-3 or deeper (if the storage has only 2 levels, keeps all).
954  // this "level-3 or deeper" is a configuration per storage.
955  // Even if the volatile page is deeper than that, we keep them if it contains newer modification,
956  // including descendants (so, probably we will keep higher levels anyways).
957  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
958  DualPagePointer& child_pointer = volatile_page->get_interior_record(i);
959  if (!child_pointer.volatile_pointer_.is_null()) {
960  ASSERT_ND(child_pointer.snapshot_pointer_ != 0);
961  uint16_t partition = extract_numa_node_from_snapshot_pointer(child_pointer.snapshot_pointer_);
962  if (!args.partitioned_drop_ || partition == args.my_partition_) {
963  result.combine(drop_volatiles_recurse(args, &child_pointer));
964  }
965  }
966  }
967  // root page is kept at this point in this case. we need to check with other threads
968  return result;
969 }
970 
972  if (storage_.get_array_metadata()->keeps_all_volatile_pages()) {
973  LOG(INFO) << "Oh, but keep-all-volatile is on. Storage-" << storage_.get_name()
974  << " is configured to keep all volatile pages.";
975  return;
976  }
977  if (is_to_keep_volatile(storage_.get_levels() - 1U)) {
978  LOG(INFO) << "Oh, but Storage-" << storage_.get_name() << " is configured to keep"
979  << " the root page.";
980  return;
981  }
982  DualPagePointer* root_pointer = &storage_.get_control_block()->root_page_pointer_;
983  ArrayPage* volatile_page = resolve_volatile(root_pointer->volatile_pointer_);
984  if (volatile_page == nullptr) {
985  LOG(INFO) << "Oh, but root volatile page already null";
986  return;
987  }
988 
989  if (volatile_page->is_leaf()) {
990  // if this is a single-level array. we now have to check epochs of records in the root page.
991  uint16_t records = storage_.get_array_size();
992  for (uint16_t i = 0; i < records; ++i) {
993  Record* record = volatile_page->get_leaf_record(i, storage_.get_payload_size());
994  Epoch epoch = record->owner_id_.xct_id_.get_epoch();
995  ASSERT_ND(epoch.is_valid());
996  if (epoch > args.snapshot_.valid_until_epoch_) {
997  LOG(INFO) << "Oh, but the root volatile page in single-level array contains a new rec";
998  return;
999  }
1000  }
1001  } else {
1002  // otherwise, all verifications already done. go drop everything!
1003  }
1004  LOG(INFO) << "Okay, drop em all!!";
1005  drop_all_recurse(args, root_pointer);
1006 }
1007 
1008 void ArrayComposer::drop_all_recurse(
1010  DualPagePointer* pointer) {
1011  if (pointer->volatile_pointer_.is_null()) {
1012  return;
1013  }
1014  ArrayPage* page = resolve_volatile(pointer->volatile_pointer_);
1015  if (!page->is_leaf()) {
1016  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
1017  DualPagePointer& child_pointer = page->get_interior_record(i);
1018  drop_all_recurse(args, &child_pointer);
1019  }
1020  }
1021  args.drop(engine_, pointer->volatile_pointer_);
1022  pointer->volatile_pointer_.clear();
1023 }
1024 
1025 inline ArrayPage* ArrayComposer::resolve_volatile(VolatilePagePointer pointer) {
1026  if (pointer.is_null()) {
1027  return nullptr;
1028  }
1029  const memory::GlobalVolatilePageResolver& page_resolver
1031  return reinterpret_cast<ArrayPage*>(page_resolver.resolve_offset(pointer));
1032 }
1033 
1034 inline Composer::DropResult ArrayComposer::drop_volatiles_recurse(
1035  const Composer::DropVolatilesArguments& args,
1036  DualPagePointer* pointer) {
1037  if (pointer->volatile_pointer_.is_null()) {
1038  return Composer::DropResult(args);
1039  }
1040  ASSERT_ND(pointer->snapshot_pointer_ == 0
1041  || extract_snapshot_id_from_snapshot_pointer(pointer->snapshot_pointer_)
1043  // The snapshot pointer CAN be null.
1044  // It means that this subtree has not constructed a new snapshot page in this snapshot.
1045  ArrayPage* child_page = resolve_volatile(pointer->volatile_pointer_);
1046  if (child_page->is_leaf()) {
1047  return drop_volatiles_leaf(args, pointer, child_page);
1048  } else {
1049  return drop_volatiles_intermediate(args, pointer, child_page);
1050  }
1051 }
1052 
1053 Composer::DropResult ArrayComposer::drop_volatiles_intermediate(
1054  const Composer::DropVolatilesArguments& args,
1055  DualPagePointer* pointer,
1056  ArrayPage* volatile_page) {
1057  ASSERT_ND(!volatile_page->header().snapshot_);
1058  ASSERT_ND(!volatile_page->is_leaf());
1059  Composer::DropResult result(args);
1060 
1061  // Explore/replace children first because we need to know if there is new modification.
1062  // In that case, we must keep this volatile page, too.
1063  // Intermediate volatile page is kept iff there are no child volatile pages.
1064  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
1065  DualPagePointer& child_pointer = volatile_page->get_interior_record(i);
1066  result.combine(drop_volatiles_recurse(args, &child_pointer));
1067  }
1068 
1069  if (result.dropped_all_) {
1070  if (is_to_keep_volatile(volatile_page->get_level())) {
1071  DVLOG(2) << "Exempted";
1072  result.dropped_all_ = false;
1073  } else {
1074  args.drop(engine_, pointer->volatile_pointer_);
1075  pointer->volatile_pointer_.clear();
1076  }
1077  } else {
1078  DVLOG(1) << "Couldn't drop an intermediate page that has a recent modification in child";
1079  }
1080  ASSERT_ND(!result.dropped_all_ || pointer->volatile_pointer_.is_null());
1081  return result;
1082 }
1083 
1084 inline Composer::DropResult ArrayComposer::drop_volatiles_leaf(
1085  const Composer::DropVolatilesArguments& args,
1086  DualPagePointer* pointer,
1087  ArrayPage* volatile_page) {
1088  ASSERT_ND(!volatile_page->header().snapshot_);
1089  ASSERT_ND(volatile_page->is_leaf());
1090  Composer::DropResult result(args);
1091  if (is_to_keep_volatile(volatile_page->get_level())) {
1092  DVLOG(2) << "Exempted";
1093  result.dropped_all_ = false;
1094  return result;
1095  }
1096 
1097  const uint16_t payload_size = storage_.get_payload_size();
1098  const ArrayRange& range = volatile_page->get_array_range();
1099  ASSERT_ND(range.end_ <= range.begin_ + volatile_page->get_leaf_record_count());
1100  ASSERT_ND(range.end_ == range.begin_ + volatile_page->get_leaf_record_count()
1101  || range.end_ == storage_.get_array_size());
1102  uint16_t records = range.end_ - range.begin_;
1103  for (uint16_t i = 0; i < records; ++i) {
1104  Record* record = volatile_page->get_leaf_record(i, payload_size);
1105  Epoch epoch = record->owner_id_.xct_id_.get_epoch();
1106  ASSERT_ND(epoch.is_valid());
1107  result.on_rec_observed(epoch);
1108  }
1109  if (result.dropped_all_) {
1110  args.drop(engine_, pointer->volatile_pointer_);
1111  pointer->volatile_pointer_.clear();
1112  }
1113  return result;
1114 }
1115 inline bool ArrayComposer::is_to_keep_volatile(uint16_t level) {
1116  uint16_t threshold = storage_.get_array_metadata()->snapshot_drop_volatile_pages_threshold_;
1117  uint16_t array_levels = storage_.get_levels();
1118  ASSERT_ND(level < array_levels);
1119  // examples:
1120  // when threshold=0, all levels (0~array_levels-1) should return false.
1121  // when threshold=1, only root level (array_levels-1) should return true
1122  // when threshold=2, upto array_levels-2..
1123  return threshold + level >= array_levels;
1124 }
1125 
1126 
1127 } // namespace array
1128 } // namespace storage
1129 } // namespace foedus
const Page *const * root_info_pages_
Root info pages output by compose()
Definition: composer.hpp:130
bool valid_
Whether this partitioner information (metadata+data) has been constructed.
ArrayComposeContext(Engine *engine, snapshot::MergeSort *merge_sort, snapshot::SnapshotWriter *snapshot_writer, cache::SnapshotFileSet *previous_snapshot_files, Page *root_info_page)
ArrayComposeContext methods.
void drop_root_volatile(const Composer::DropVolatilesArguments &args)
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
Represents one record in our key-value store.
Definition: record.hpp:33
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:378
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
Definition: composer.hpp:110
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:91
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
bool partitionable_
if false, every record goes to node-0.
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
Declares all log types used in this storage type.
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Definition: merge_sort.cpp:273
Represents a logic to compose a new version of data pages for one storage.
Definition: composer.hpp:86
ArrayComposer's compose() implementation separated from the class itself.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
const ArrayMetadata * get_array_metadata() const
void initialize_snapshot_page(Epoch initial_epoch, StorageId storage_id, SnapshotPagePointer page_id, uint16_t payload_size, uint8_t level, const ArrayRange &array_range)
Called only when this page is initialized.
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
uint64_t ArrayOffset
The only key type in array storage.
Definition: array_id.hpp:48
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
Definition: stop_watch.hpp:48
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
Definition: snapshot.hpp:55
const uint8_t kMaxLevels
Code in array storage assumes this number as the maximum number of levels.
Definition: array_id.hpp:118
bool keeps_all_volatile_pages() const
Definition: metadata.hpp:98
ErrorStack compose(const Composer::ComposeArguments &args)
PartitionId bucket_owners_[kInteriorFanout]
partition of each bucket.
ArrayOffset array_size_
Size of the entire array.
uint16_t get_payload_size() const
Returns byte size of one record in this array storage without internal overheads. ...
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Represents one data page in Array Storage.
ErrorCode dump_intermediates(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the sub intermediate page pool.
Represents a time epoch.
Definition: epoch.hpp:61
XctId xct_id_
the second 64bit: Persistent status part of TID.
Definition: xct_id.hpp:1137
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Definition: composer.hpp:99
Holds a set of read-only file objects for snapshot files.
ArrayOffset bucket_size_
bucket = offset / bucket_size_.
const Metadata * get_metadata() const
Returns the metadata of this storage.
Definition: storage.hpp:162
Output of one compose() call, which are then combined in construct_root().
const log::RecordLogType * resolve_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
Definition: merge_sort.hpp:384
ArrayComposer(Composer *parent)
ArrayComposer methods.
Declares common log types used in all packages.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
0x0023 : foedus::storage::array::ArrayIncrementLogType .
Definition: log_type.hpp:116
0x0022 : foedus::storage::array::ArrayOverwriteLogType .
Definition: log_type.hpp:115
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
uint32_t log_streams_count_
Number of sorted runs.
Definition: composer.hpp:103
void apply_record(thread::Thread *context, StorageId storage_id, xct::RwLockableXctId *owner_id, char *payload) const __attribute__((always_inline))
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
ArrayOffset get_array_size() const
Returns the size of this array.
SnapshotPagePointer pointers_[kInteriorFanout]
Pointers to direct children of root.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
ErrorStack construct_root(const Composer::ConstructRootArguments &args)
A base class for ArrayOverwriteLogType/ArrayIncrementLogType.
ArrayOffset begin_
Inclusive beginning of the offset range.
Definition: array_id.hpp:86
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
0 means no-error.
Definition: error_code.hpp:87
Log type of array-storage's overwrite operation.
const DualPagePointer & get_interior_record(uint16_t record) const __attribute__((always_inline))
const ArrayRange & get_array_range() const
char payload_[8]
Arbitrary payload given by the user.
Definition: record.hpp:45
ArrayOffset end_
Exclusive end of the offset range.
Definition: array_id.hpp:88
const Record * get_leaf_record(uint16_t record, uint16_t payload_size) const __attribute__((always_inline))
const StorageName & get_name() const
Returns the unique name of this storage.
Definition: storage.hpp:155
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
Definition: merge_sort.hpp:77
bool exists() const
Returns whether this storage is already created.
Definition: storage.hpp:169
Composer::DropResult drop_volatiles(const Composer::DropVolatilesArguments &args)
drop_volatiles and related methods
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Page * root_info_page_
[OUT] Returns pointers and related information that is required to construct the root page...
Definition: composer.hpp:116
MergedPosition get_current_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:369
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
storage::Page * get_page_base() __attribute__((always_inline))
Retrun value of drop_volatiles()
Definition: composer.hpp:171
SnapshotPagePointer * new_root_page_pointer_
[OUT] Returns pointer to new root snapshot page/
Definition: composer.hpp:136
bool dropped_all_
Whether all volatile pages under the page was dropped.
Definition: composer.hpp:202
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
bool contains(ArrayOffset offset) const
Definition: array_id.hpp:79
uint16_t my_partition_
if partitioned_drop_ is true, the partition this thread should drop volatile pages from ...
Definition: composer.hpp:152
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Definition: composer.hpp:128
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
uint8_t get_levels() const
Returns the number of levels.
bool is_valid() const
Definition: epoch.hpp:96
storage::Page * get_intermediate_base() __attribute__((always_inline))
Represents an offset range in an array storage.
Definition: array_id.hpp:62
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
Packages logic and required properties to calculate LookupRoute in array storage from offset...
Definition: array_route.hpp:86
Tiny metadata of partitioner for every storage used while log gleaning.
uint16_t get_records_in_leaf() const __attribute__((always_inline))
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
Definition: composer.hpp:126
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
void * locate_data(Engine *engine)
Returns the partitioner data pointed from this metadata.
Definition: partitioner.cpp:51
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
Definition: composer.hpp:97
const ErrorStack kRetOk
Normal return value for no-error case.
bool partitioned_drop_
if true, one thread for each partition will invoke drop_volatiles()
Definition: composer.hpp:154
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
int64_t int_div_ceil(int64_t dividee, int64_t dividor)
Efficient ceil(dividee/dividor) for integer.
uint64_t get_key() const __attribute__((always_inline))
Definition: merge_sort.hpp:132
uint32_t root_info_pages_count_
Number of root info pages.
Definition: composer.hpp:132
Base class for log type of record-wise operation.
void combine(const DropResult &other)
Definition: composer.hpp:176
uint16_t snapshot_drop_volatile_pages_threshold_
Number of levels of volatile pages to keep after each snapshotting.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
Definition: array_id.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
snapshot::SortedBuffer *const * log_streams_
Sorted runs.
Definition: composer.hpp:101
CONTROL_BLOCK * get_control_block() const
Definition: attachable.hpp:97
snapshot::Snapshot snapshot_
The new snapshot.
Definition: composer.hpp:150
void drop(Engine *engine, VolatilePagePointer pointer) const
Returns (might cache) the given pointer to volatile pool.
Definition: composer.cpp:114
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Definition: composer.hpp:105
storage::SnapshotPagePointer get_next_page_id() const
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
Entries we actually sort.
Definition: merge_sort.hpp:116
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
xct::RwLockableXctId owner_id_
This indicates the transaction that most recently modified this record.
Definition: record.hpp:39
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
Writes out one snapshot file for all data pages in one reducer.
static PartitionerMetadata * get_metadata(Engine *engine, StorageId id)
Returns the shared memory for the given storage ID.
Definition: partitioner.cpp:38
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112