libfoedus-core
FOEDUS Core Library
hash_composer_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 #include <memory>
25 #include <ostream>
26 #include <string>
27 #include <thread>
28 #include <vector>
29 
30 #include "foedus/engine.hpp"
54 
55 namespace foedus {
56 namespace storage {
57 namespace hash {
58 
60  const memory::GlobalVolatilePageResolver& resolver,
61  VolatilePagePointer pointer) {
62  if (pointer.is_null()) {
63  return nullptr;
64  }
65  HashDataPage* ret = reinterpret_cast<HashDataPage*>(resolver.resolve_offset(pointer));
68  return ret;
69 }
70 
72  const memory::GlobalVolatilePageResolver& resolver,
73  VolatilePagePointer pointer) {
74  if (pointer.is_null()) {
75  return nullptr;
76  }
78  = reinterpret_cast<HashIntermediatePage*>(resolver.resolve_offset(pointer));
81  return ret;
82 }
83 
90  : engine_(parent->get_engine()),
91  storage_id_(parent->get_storage_id()),
92  storage_(engine_, storage_id_),
93  volatile_resolver_(engine_->get_memory_manager()->get_global_volatile_page_resolver()) {
94  ASSERT_ND(storage_.exists());
95 }
96 
98  VLOG(0) << to_string() << " composing with " << args.log_streams_count_ << " streams.";
99  debugging::StopWatch stop_watch;
100 
101  snapshot::MergeSort merge_sort(
102  storage_id_,
103  kHashStorage,
104  args.base_epoch_,
105  args.log_streams_,
106  args.log_streams_count_,
108  args.work_memory_);
109  CHECK_ERROR(merge_sort.initialize());
110 
111  HashComposeContext context(
112  engine_,
113  &merge_sort,
114  args.snapshot_writer_,
116  args.root_info_page_);
117  CHECK_ERROR(context.execute());
118 
119  CHECK_ERROR(merge_sort.uninitialize()); // no need for scoped release. its destructor is safe.
120 
121  stop_watch.stop();
122  LOG(INFO) << to_string() << " done in " << stop_watch.elapsed_ms() << "ms.";
123  return kRetOk;
124 }
125 
127  // Unlike array package, this method might do lots of I/O.
128  // Each reducer creates only data pages. They don't create intermediate pages
129  // and instead they emit linked-lists of HashComposedBinsPage as well as HashRootInfoPage
130  // pointing to them. We combine these information here, writing out new intermediate pages.
131 
132  // compose() created root_info_pages that contain pointers to fill in the root page,
133  // so we just find non-zero entry and copy it to root page.
134  const uint8_t levels = storage_.get_levels();
135 
136  HashIntermediatePage* root_page = reinterpret_cast<HashIntermediatePage*>(
138  SnapshotPagePointer old_root_page_id = storage_.get_metadata()->root_snapshot_page_id_;
139  if (old_root_page_id != 0) {
140  WRAP_ERROR_CODE(args.previous_snapshot_files_->read_page(old_root_page_id, root_page));
141  ASSERT_ND(root_page->header().storage_id_ == storage_id_);
142  ASSERT_ND(root_page->header().page_id_ == old_root_page_id);
143  ASSERT_ND(root_page->get_level() + 1U == storage_.get_levels());
144  root_page->header().page_id_ = 0;
145  } else {
146  root_page->initialize_snapshot_page(
147  storage_id_,
148  0, // new page ID not known at this point
149  storage_.get_levels() - 1U,
150  0);
151  }
152 
153  if (levels == 1U) {
154  // single-level hash storage's root info page directly points to data pages, so
155  // no need for parallelization. we separate the case below.
156  CHECK_ERROR(construct_root_single_level(args, root_page));
157  } else {
158  // otherwise, we parallelze the heavy lifting part, which involves raeds/writes of pages.
159  LOG(INFO) << to_string() << " construct_root() multi-level parallelized path";
160  std::vector< std::thread > threads;
161 
162  const uint16_t nodes = engine_->get_soc_count();
163  std::unique_ptr< ErrorStack[] > error_stacks(new ErrorStack[nodes]);
164  for (uint16_t numa_node = 0; numa_node < nodes; ++numa_node) {
165  threads.emplace_back(
167  this,
168  &args,
169  numa_node,
170  root_page,
171  error_stacks.get() + numa_node);
172  }
173 
174  LOG(INFO) << to_string() << " construct_root() launched " << nodes << " threads...";
175  for (uint16_t numa_node = 0; numa_node < nodes; ++numa_node) {
176  threads[numa_node].join();
177  if (error_stacks.get()[numa_node].is_error()) {
178  LOG(ERROR) << to_string() << " construct_root() observed an error in the launched"
179  << " thread-" << numa_node << ". will propagate after joining the worker threads. "
180  << "error description: " << error_stacks.get()[numa_node];
181  }
182  CHECK_ERROR(error_stacks.get()[numa_node]);
183  }
184  LOG(INFO) << to_string() << " construct_root() joined";
185  // propagate errors.
186  for (uint16_t numa_node = 0; numa_node < nodes; ++numa_node) {
187  CHECK_ERROR(error_stacks.get()[numa_node]);
188  }
189  }
190 
191  // write out root_page image
192  SnapshotPagePointer new_root_page_id = args.snapshot_writer_->get_next_page_id();
193  root_page->header().page_id_ = new_root_page_id;
194  std::memcpy(args.snapshot_writer_->get_page_base(), root_page, kPageSize);
196  ASSERT_ND(args.snapshot_writer_->get_next_page_id() == new_root_page_id + 1ULL);
197 
198  *args.new_root_page_pointer_ = new_root_page_id;
199  // AFTER writing out the root page, install the pointer to new root page
200  storage_.get_control_block()->root_page_pointer_.snapshot_pointer_ = new_root_page_id;
201  storage_.get_control_block()->meta_.root_snapshot_page_id_ = new_root_page_id;
202  return kRetOk;
203 }
204 
205 ErrorStack HashComposer::construct_root_single_level(
207  HashIntermediatePage* root_page) {
208  ASSERT_ND(storage_.get_levels() == 1U);
209  LOG(INFO) << to_string() << " construct_root() Single-level path";
210  snapshot::SnapshotId new_snapshot_id = args.snapshot_writer_->get_snapshot_id();
211 
212  // single-level, so all reducuers wrote out at most only one entry per sub-tree.
213  // so, the bins pages must be contiguous. just one read!
214  HashComposedBinsPage* buffer = reinterpret_cast<HashComposedBinsPage*>(
215  args.gleaner_resource_->writer_pool_memory_.get_block()); // whatever memory. just 1 thread.
216  uint32_t buffer_pages = args.gleaner_resource_->writer_pool_memory_.get_size() / kPageSize;
217  uint16_t root_children = storage_.get_root_children();
218  ASSERT_ND(buffer_pages > root_children);
219  for (uint32_t i = 0; i < args.root_info_pages_count_; ++i) {
220  const HashRootInfoPage* casted
221  = reinterpret_cast<const HashRootInfoPage*>(args.root_info_pages_[i]);
222  ASSERT_ND(casted->get_pointer(0).volatile_pointer_.word == 1); // only one page
223  SnapshotPagePointer bins_page_id = casted->get_pointer(0).snapshot_pointer_;
224 
225  // check if it's really contiguous
226  for (uint16_t index = 0; index < root_children; ++index) {
227  DualPagePointer pointer = casted->get_pointer(index);
228  ASSERT_ND(pointer.volatile_pointer_.word == 1U);
229  ASSERT_ND(pointer.snapshot_pointer_ == bins_page_id + index);
230  if (pointer.volatile_pointer_.word != 1U) {
231  return ERROR_STACK_MSG(kErrorCodeInternalError, "wtf 1");
232  }
233  if (pointer.snapshot_pointer_ != bins_page_id + index) {
234  return ERROR_STACK_MSG(kErrorCodeInternalError, "wtf 2");
235  }
236  }
237 
238  // collect pointers to data pages.
239  WRAP_ERROR_CODE(args.previous_snapshot_files_->read_pages(bins_page_id, root_children, buffer));
240  for (uint16_t index = 0; index < root_children; ++index) {
241  HashComposedBinsPage* bins_page = buffer + index;
242  if (bins_page->bin_count_) {
243  ASSERT_ND(bins_page->bin_count_ == 1U);
244  ASSERT_ND(bins_page->bins_[0].bin_ == index);
245  SnapshotPagePointer new_pointer = bins_page->bins_[0].page_id_;
246  SnapshotPagePointer cur_pointer = root_page->get_pointer(index).snapshot_pointer_;
247  ASSERT_ND(new_pointer != 0);
248  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(new_pointer) == new_snapshot_id);
249  ASSERT_ND(cur_pointer == 0
250  || extract_snapshot_id_from_snapshot_pointer(cur_pointer) != new_snapshot_id);
251  root_page->get_pointer(index).snapshot_pointer_ = new_pointer;
252  }
253  }
254  }
255 
256  return kRetOk;
257 }
258 
259 ErrorStack HashComposer::construct_root_multi_level(
260  const Composer::ConstructRootArguments& args,
261  uint16_t numa_node,
262  HashIntermediatePage* root_page) {
263  const uint16_t nodes = engine_->get_soc_count();
264  const uint16_t root_children = storage_.get_root_children();
265  ASSERT_ND(numa_node < nodes);
266  thread::NumaThreadScope numa_scope(numa_node);
267 
268  VLOG(0) << to_string() << " construct_root() child thread numa_node-" << numa_node;
269  const snapshot::SnapshotId new_snapshot_id = args.snapshot_writer_->get_snapshot_id();
270 
271  // This method runs on multi-threads, but in this NUMA node I'm the only one.
272  // We thus can open a snapshot_writer for this thread and write out to there.
273  cache::SnapshotFileSet fileset(engine_);
274  CHECK_ERROR(fileset.initialize());
275  UninitializeGuard fileset_guard(&fileset, UninitializeGuard::kWarnIfUninitializeError);
276 
277  snapshot::LogGleanerResource::PerNodeResource& resource
278  = args.gleaner_resource_->per_node_resources_[numa_node];
279  // Just be careful in the case of node-0: the gleaner itself holds it.
280  // In that case, create another snapshot_writer.
281  std::unique_ptr< snapshot::SnapshotWriter > snapshot_writer_to_delete(nullptr);
282  snapshot::SnapshotWriter* snapshot_writer;
283  if (numa_node == 0) {
284  snapshot_writer = args.snapshot_writer_;
285  } else {
286  snapshot_writer_to_delete.reset(new snapshot::SnapshotWriter(
287  engine_,
288  numa_node,
289  new_snapshot_id,
290  &resource.write_buffer_,
291  &resource.write_intermediate_buffer_,
292  true)); // we append to the node-0 snapshot file.
293  snapshot_writer = snapshot_writer_to_delete.get();
294  CHECK_ERROR(snapshot_writer->open());
295  }
296 
297  const uint32_t inputs = args.root_info_pages_count_;
298  ASSERT_ND(inputs > 0);
299 
300  ComposedBinsMergedStream streams;
301  for (uint16_t index = 0; index < root_children; ++index) {
302  if (index % nodes != numa_node) {
303  // round robin assignment
304  continue;
305  }
306 
307  SnapshotPagePointer* destination = &root_page->get_pointer(index).snapshot_pointer_;
308  ASSERT_ND((*destination) == 0
309  || extract_snapshot_id_from_snapshot_pointer(*destination) != new_snapshot_id);
310 
311  bool had_any_change = false;
312  uint32_t writer_buffer_pos = 0; // how many level-0 pages made in the main buffer
313  uint32_t writer_higher_buffer_pos = 0; // how many higher level pages made in the sub buffer
314 
315  CHECK_ERROR(streams.init(
316  reinterpret_cast<const HashRootInfoPage* const*>(args.root_info_pages_),
317  args.root_info_pages_count_,
318  root_page,
319  index,
320  &resource.read_buffer_,
321  &fileset,
322  snapshot_writer,
323  &writer_buffer_pos,
324  &writer_higher_buffer_pos,
325  &had_any_change));
326 
327  if (!had_any_change) {
328  VLOG(0) << to_string() << " empty sub-tree " << index;
329  } else {
330  VLOG(0) << to_string() << " processing sub-tree " << index << "...";
331  uint32_t installed_count = 0;
332  HashBin next_bin = kInvalidHashBin;
333  while (true) {
334  WRAP_ERROR_CODE(streams.process_a_bin(&installed_count, &next_bin));
335  if (next_bin == kInvalidHashBin) {
336  break;
337  }
338  WRAP_ERROR_CODE(streams.switch_path(
339  next_bin,
340  &fileset,
341  snapshot_writer,
342  &writer_buffer_pos,
343  &writer_higher_buffer_pos));
344  }
345  VLOG(0) << to_string() << " processed sub-tree " << index << "."
346  << " installed_count=" << installed_count
347  << " writer_buffer_pos=" << writer_buffer_pos
348  << " writer_higher_buffer_pos=" << writer_higher_buffer_pos;
349 
350  // We have modified at least one level-0 page because we had at least one bin.
351  // level-0 page knew its page ID back then. just write them out.
352  ASSERT_ND(writer_buffer_pos > 0);
353  WRAP_ERROR_CODE(snapshot_writer->dump_pages(0, writer_buffer_pos));
354 
355  // higher-levels need to set page IDs because we couldn't know their page IDs back then.
356  if (storage_.get_levels() == 2U) {
357  // 2-level means root's child is level-0 page, thus no "higher-level".
358  ASSERT_ND(writer_higher_buffer_pos == 0);
359  } else {
360  ASSERT_ND(writer_higher_buffer_pos > 0);
361  HashIntermediatePage* higher_base
362  = reinterpret_cast<HashIntermediatePage*>(snapshot_writer->get_intermediate_base());
363  // the first page is always the root-child because we open it first
364  HashIntermediatePage* root_child = higher_base + 0;
365  ASSERT_ND(root_child->get_level() == storage_.get_levels() - 2U);
366  ASSERT_ND(root_page->get_pointer(index).snapshot_pointer_ == 0);
367  // and this is the highest level for this sub-tree, so there is no pointer to this page.
368  // hence, page_id==0 means null.
369  SnapshotPagePointer base_id = snapshot_writer->get_next_page_id();
370 
371 #ifndef NDEBUG
372  // each page should be referenced exactly once. let's check it
373  uint8_t* ref_counts = new uint8_t[writer_higher_buffer_pos];
374  std::memset(ref_counts, 0, sizeof(uint8_t) * writer_higher_buffer_pos);
375  uint32_t total_replaced = 0;
376 #endif // NDEBUG
377 
378  for (uint32_t i = 0; i < writer_higher_buffer_pos; ++i) {
379  SnapshotPagePointer page_id = base_id + i;
380  HashIntermediatePage* page = higher_base + i;
381  ASSERT_ND(page->get_level() > 0);
382  page->header().page_id_ = page_id;
383  for (uint16_t j = 0; j < kHashIntermediatePageFanout; ++j) {
384  SnapshotPagePointer id = page->get_pointer(j).snapshot_pointer_;
385  if (id != 0 && extract_snapshot_id_from_snapshot_pointer(id) == 0) {
386  ASSERT_ND(id < writer_higher_buffer_pos);
387  page->get_pointer(j).snapshot_pointer_ = id + base_id;
388 #ifndef NDEBUG
389  ++total_replaced;
390 #endif // NDEBUG
391  }
392  }
393  }
394 
395 #ifndef NDEBUG
396  ASSERT_ND(total_replaced + 1U == writer_higher_buffer_pos); // replaced all pointers
397  ASSERT_ND(ref_counts[0] == 0); // highest level in sub-tree. it should be never referenced.
398  for (uint32_t i = 1; i < writer_higher_buffer_pos; ++i) {
399  ASSERT_ND(ref_counts[i] == 1U);
400  }
401  delete[] ref_counts;
402 #endif // NDEBUG
403 
404  WRAP_ERROR_CODE(snapshot_writer->dump_intermediates(0, writer_higher_buffer_pos));
405  root_page->get_pointer(index).snapshot_pointer_ = base_id;
406  }
407  }
408  }
409 
410 
411  if (numa_node != 0) {
412  snapshot_writer_to_delete.get()->close();
413  }
414 
415  VLOG(0) << to_string() << " construct_root() child thread numa_node-" << numa_node << " done";
416  return kRetOk;
417 }
418 
419 
420 inline HashDataPage* HashComposer::resolve_data(VolatilePagePointer pointer) const {
421  return resolve_data_impl(volatile_resolver_, pointer);
422 }
423 inline HashIntermediatePage* HashComposer::resolve_intermediate(VolatilePagePointer pointer) const {
424  return resolve_intermediate_impl(volatile_resolver_, pointer);
425 }
426 
427 std::string HashComposer::to_string() const {
428  return std::string("HashComposer-") + std::to_string(storage_id_);
429 }
430 
437  Engine* engine,
438  snapshot::MergeSort* merge_sort,
439  snapshot::SnapshotWriter* snapshot_writer,
440  cache::SnapshotFileSet* previous_snapshot_files,
441  Page* root_info_page)
442  : engine_(engine),
443  merge_sort_(merge_sort),
444  system_initial_epoch_(engine->get_savepoint_manager()->get_initial_durable_epoch()),
445  storage_id_(merge_sort_->get_storage_id()),
446  snapshot_id_(snapshot_writer->get_snapshot_id()),
447  storage_(engine, storage_id_),
448  snapshot_writer_(snapshot_writer),
449  previous_snapshot_files_(previous_snapshot_files),
450  root_info_page_(reinterpret_cast<HashRootInfoPage*>(root_info_page)),
451  partitionable_(engine_->get_soc_count() > 1U),
452  levels_(storage_.get_levels()),
453  bin_bits_(storage_.get_bin_bits()),
454  bin_shifts_(storage_.get_bin_shifts()),
455  root_children_(storage_.get_root_children()),
456  numa_node_(snapshot_writer->get_numa_node()),
457  total_bin_count_(storage_.get_bin_count()),
458  previous_root_page_pointer_(storage_.get_metadata()->root_snapshot_page_id_),
459  volatile_resolver_(engine->get_memory_manager()->get_global_volatile_page_resolver()) {
460  cur_path_memory_.alloc(
462  kPageSize,
464  numa_node_);
465  cur_path_ = reinterpret_cast<HashIntermediatePage*>(cur_path_memory_.get_block());
466  cur_path_lowest_level_ = levels_;
467  cur_path_valid_range_ = HashBinRange(0, 0);
468 
469  cur_bin_ = kCurBinNotOpened;
470  cur_intermediate_tail_ = nullptr;
471 
472  data_page_io_memory_.alloc(
473  kPageSize,
474  kPageSize,
476  numa_node_);
477 
478  allocated_pages_ = 0;
479  allocated_intermediates_ = 0;
480  page_base_ = reinterpret_cast<HashDataPage*>(snapshot_writer_->get_page_base());
481  max_pages_ = snapshot_writer_->get_page_size();
482  intermediate_base_
483  = reinterpret_cast<HashComposedBinsPage*>(snapshot_writer_->get_intermediate_base());
484  max_intermediates_ = snapshot_writer_->get_intermediate_size();
485 }
486 
488  // Initializations
489  std::memset(root_info_page_, 0, kPageSize);
490  root_info_page_->header().storage_id_ = storage_id_;
491  CHECK_ERROR(init_intermediates());
492  CHECK_ERROR(init_cur_path());
493  WRAP_ERROR_CODE(cur_bin_table_.create_memory(numa_node_)); // TASK(Hideaki) reuse memory
494  cur_bin_table_.clean();
495  VLOG(0) << "HashComposer-" << storage_id_ << " initialization done. processing...";
496 
497  bool processed_any = false;
498  cur_bin_ = kCurBinNotOpened;
499  while (true) {
500  CHECK_ERROR(merge_sort_->next_batch());
501  uint64_t count = merge_sort_->get_current_count();
502  if (count == 0 && merge_sort_->is_ended_all()) {
503  break;
504  }
505  processed_any = true;
506  const snapshot::MergeSort::SortEntry* sort_entries = merge_sort_->get_sort_entries();
507  uint64_t cur = 0;
508  while (cur < count) {
509  HashBin head_bin = sort_entries[cur].get_key();
510  ASSERT_ND(head_bin < storage_.get_bin_count());
511  if (cur_bin_ != head_bin) {
512  // now we have to finalize the previous bin and switch to a new bin!
513  ASSERT_ND(cur_bin_ == kCurBinNotOpened || cur_bin_ < head_bin); // sorted by bins
514  CHECK_ERROR(close_cur_bin());
515  ASSERT_ND(cur_bin_ == kCurBinNotOpened);
516  CHECK_ERROR(open_cur_bin(head_bin));
517  ASSERT_ND(cur_bin_ == head_bin);
518  }
519 
520  // grab a range of logs that are in the same bin.
521  uint64_t next;
522  for (next = cur + 1U; LIKELY(next < count); ++next) {
523  // this check uses sort_entries which are nicely contiguous.
524  HashBin bin = sort_entries[next].get_key();
525  ASSERT_ND(bin >= head_bin);
526  if (UNLIKELY(bin != head_bin)) {
527  break;
528  }
529  }
530 
531  WRAP_ERROR_CODE(apply_batch(cur, next));
532  cur = next;
533  }
534  ASSERT_ND(cur == count);
535  }
536 
537  if (!processed_any) {
538  LOG(ERROR) << "wtf? no logs? storage-" << storage_id_;
539  }
540 
541  CHECK_ERROR(finalize());
542  return kRetOk;
543 }
544 
545 ErrorCode HashComposeContext::apply_batch(uint64_t cur, uint64_t next) {
546  // TASK(Hideaki) prefetching in hash composer.
547  const uint16_t kFetchSize = 8;
548  const log::RecordLogType* logs[kFetchSize];
549  while (cur < next) {
550  uint16_t desired = std::min<uint16_t>(kFetchSize, next - cur);
551  uint16_t fetched = merge_sort_->fetch_logs(cur, desired, logs);
552  for (uint16_t i = 0; i < kFetchSize && LIKELY(i < fetched); ++i) {
553  const HashCommonLogType* log = reinterpret_cast<const HashCommonLogType*>(logs[i]);
554  log->assert_type();
555  HashValue hash = log->hash_;
556  ASSERT_ND(cur_bin_ == (hash >> storage_.get_bin_shifts()));
558  CHECK_ERROR_CODE(cur_bin_table_.overwrite_record(
559  log->header_.xct_id_,
560  log->get_key(),
561  log->key_length_,
562  hash,
563  log->get_payload(),
564  log->payload_offset_,
565  log->payload_count_));
566  } else if (log->header_.get_type() == log::kLogCodeHashInsert) {
567  CHECK_ERROR_CODE(cur_bin_table_.insert_record(
568  log->header_.xct_id_,
569  log->get_key(),
570  log->key_length_,
571  hash,
572  log->get_payload(),
573  log->payload_count_));
574  } else if (log->header_.get_type() == log::kLogCodeHashUpdate) {
575  CHECK_ERROR_CODE(cur_bin_table_.update_record(
576  log->header_.xct_id_,
577  log->get_key(),
578  log->key_length_,
579  hash,
580  log->get_payload(),
581  log->payload_count_));
582  } else {
584  CHECK_ERROR_CODE(cur_bin_table_.delete_record(
585  log->header_.xct_id_,
586  log->get_key(),
587  log->key_length_,
588  hash));
589  }
590  }
591  cur += fetched;
592  ASSERT_ND(cur <= next);
593  }
594  return kErrorCodeOk;
595 }
596 
597 ErrorStack HashComposeContext::finalize() {
598  CHECK_ERROR(close_cur_bin());
599 
600  // flush the main buffer. now we finalized all data pages
601  if (allocated_pages_ > 0) {
602  WRAP_ERROR_CODE(dump_data_pages());
603  ASSERT_ND(allocated_pages_ == 0);
604  }
605 
606  // as soon as we flush out all data pages, we can install snapshot pointers to them.
607  // this is just about data pages (head pages in each bin), not intermediate pages
608  uint64_t installed_count = 0;
609  CHECK_ERROR(install_snapshot_data_pages(&installed_count));
610 
611  // then dump out HashComposedBinsPage.
612  // we stored them in a separate buffer, and now finally we can get their page IDs.
613  // Until now, we used relative indexes in intermediate buffer as page ID, storing them in
614  // page ID header. now let's convert all of them to be final page ID.
615  // base_pointer + offset in intermediate buffer will be the new page ID.
616  const SnapshotPagePointer base_pointer = snapshot_writer_->get_next_page_id();
617  for (uint32_t i = 0; i < root_children_; ++i) {
618  // these are heads of linked-list. We keep pointers to these pages in root-info page
619  root_info_page_->get_pointer(i).snapshot_pointer_ = base_pointer + i;
620  // we use the volatile pointer to represent the number of pages in the sub-tree.
621  ASSERT_ND(root_info_page_->get_pointer(i).volatile_pointer_.word > 0);
622  }
623  for (uint32_t i = 0; i < allocated_intermediates_; ++i) {
624  HashComposedBinsPage* page = intermediate_base_ + i;
625  SnapshotPagePointer new_page_id = base_pointer + i;
626  ASSERT_ND(page->header_.page_id_ == i);
627  page->header_.page_id_ = new_page_id;
628  if (page->next_page_) {
629  // also updates next-pointers.
630  ASSERT_ND(page->next_page_ < allocated_intermediates_);
631  ASSERT_ND(page->bin_count_ == kHashComposedBinsPageMaxBins);
632  page->next_page_ = base_pointer + page->next_page_;
633  }
634  }
635  snapshot_writer_->dump_intermediates(0, allocated_intermediates_);
636 
637  return kRetOk;
638 }
639 
640 ErrorCode HashComposeContext::dump_data_pages() {
641  CHECK_ERROR_CODE(snapshot_writer_->dump_pages(0, allocated_pages_));
642  ASSERT_ND(snapshot_writer_->get_next_page_id()
643  == page_base_[0].header().page_id_ + allocated_pages_);
644  ASSERT_ND(snapshot_writer_->get_next_page_id()
645  == page_base_[allocated_pages_ - 1].header().page_id_ + 1ULL);
646  allocated_pages_ = 0;
647  return kErrorCodeOk;
648 }
649 
650 
651 inline HashDataPage* HashComposeContext::resolve_data(VolatilePagePointer pointer) const {
652  return resolve_data_impl(volatile_resolver_, pointer);
653 }
654 inline HashIntermediatePage* HashComposeContext::resolve_intermediate(
655  VolatilePagePointer pointer) const {
656  return resolve_intermediate_impl(volatile_resolver_, pointer);
657 }
658 
659 bool HashComposeContext::verify_new_pointer(SnapshotPagePointer pointer) const {
661  if (!engine_->is_master()) {
662  ASSERT_ND(extract_numa_node_from_snapshot_pointer(pointer) == numa_node_);
663  }
664  ASSERT_ND(extract_snapshot_id_from_snapshot_pointer(pointer) == snapshot_id_);
665  return true;
666 }
667 bool HashComposeContext::verify_old_pointer(SnapshotPagePointer pointer) const {
668  return pointer == 0 || extract_snapshot_id_from_snapshot_pointer(pointer) != snapshot_id_;
669 }
670 
676 SnapshotPagePointer HashComposeContext::get_cur_path_bin_head(HashBin bin) const {
677  ASSERT_ND(cur_path_valid_range_.contains(bin));
678  if (cur_path_lowest_level_ > 0) {
679  return 0;
680  }
681  ASSERT_ND(cur_path_[0].get_bin_range().contains(bin));
682  uint16_t index = bin - cur_path_[0].get_bin_range().end_;
683  return cur_path_[0].get_pointer(index).snapshot_pointer_;
684 }
685 
686 ErrorStack HashComposeContext::init_cur_path() {
687  if (previous_root_page_pointer_ == 0) {
688  ASSERT_ND(is_initial_snapshot());
689  std::memset(cur_path_, 0, kPageSize * levels_);
690  cur_path_lowest_level_ = levels_;
691  cur_path_valid_range_ = HashBinRange(0, kHashMaxBins[levels_]);
692  } else {
693  ASSERT_ND(!is_initial_snapshot());
694  HashIntermediatePage* root = get_cur_path_page(levels_ - 1U);
695  WRAP_ERROR_CODE(previous_snapshot_files_->read_page(previous_root_page_pointer_, root));
696  ASSERT_ND(root->header().storage_id_ == storage_id_);
697  ASSERT_ND(root->header().page_id_ == previous_root_page_pointer_);
698  ASSERT_ND(root->get_level() + 1U == levels_);
699  ASSERT_ND(root->get_bin_range() == HashBinRange(0ULL, kHashMaxBins[levels_]));
700  cur_path_lowest_level_ = root->get_level();
701  cur_path_valid_range_ = root->get_bin_range();
702 
703  HashIntermediatePage* parent = root;
704  while (parent->get_level() > 0) {
705  HashIntermediatePage* child = get_cur_path_page(parent->get_level() - 1U);
706  SnapshotPagePointer pointer = parent->get_pointer(0).snapshot_pointer_;
707  if (pointer == 0) {
708  std::memset(child, 0, kPageSize);
709  break;
710  } else {
711  WRAP_ERROR_CODE(previous_snapshot_files_->read_page(pointer, child));
712  ASSERT_ND(child->header().storage_id_ == storage_id_);
713  ASSERT_ND(child->header().page_id_ == pointer);
714  ASSERT_ND(child->get_level() + 1U == parent->get_level());
715  ASSERT_ND(child->get_bin_range() == HashBinRange(0ULL, parent->get_level()));
716  cur_path_lowest_level_ = child->get_level();
717  cur_path_valid_range_ = child->get_bin_range();
718  parent = child;
719  }
720  }
721  }
722  return kRetOk;
723 }
724 
725 inline ErrorCode HashComposeContext::update_cur_path_if_needed(HashBin bin) {
726  ASSERT_ND(verify_cur_path());
727 
728  // Even when LIKELY mis-predicts, the penalty is amortized by the page-read cost.
729  if (LIKELY(is_initial_snapshot()
730  || levels_ == 1U
731  || (cur_path_valid_range_.contains(bin) && cur_path_lowest_level_ == 0))) {
732  return kErrorCodeOk;
733  }
734 
735  return update_cur_path(bin);
736 }
737 
738 ErrorCode HashComposeContext::update_cur_path(HashBin bin) {
739  ASSERT_ND(!is_initial_snapshot());
740  ASSERT_ND(!cur_path_valid_range_.contains(bin));
741  ASSERT_ND(levels_ > 1U); // otherwise no page switch should happen
742  ASSERT_ND(verify_cur_path());
743 
744  // goes up until cur_path_valid_range_.contains(bin)
745  while (!cur_path_valid_range_.contains(bin)) {
746  ASSERT_ND(cur_path_lowest_level_ + 1U < levels_); // otherwise even root doesn't contain it
747  ++cur_path_lowest_level_;
748  cur_path_valid_range_ = get_cur_path_lowest()->get_bin_range();
749  ASSERT_ND(get_cur_path_lowest()->get_bin_range() == cur_path_valid_range_);
750  }
751 
752  // then goes down as much as possible
753  IntermediateRoute route = IntermediateRoute::construct(bin);
754 
755 #ifndef NDEBUG
756  // route[level+1] is the ordinal in intermediate page of the level+1, pointing to the child.
757  // thus cur_path[level] should have that pointer as its page ID.
758  for (uint8_t level = cur_path_lowest_level_; level + 1U < levels_; ++level) {
759  SnapshotPagePointer child_id = get_cur_path_page(level)->header().page_id_;
760  HashIntermediatePage* parent = get_cur_path_page(level + 1U);
761  ASSERT_ND(parent->get_pointer(route.route[level + 1U]).snapshot_pointer_ == child_id);
762  }
763 #endif // NDEBUG
764 
765  while (cur_path_lowest_level_ > 0) {
766  uint8_t index = route.route[cur_path_lowest_level_];
767  HashIntermediatePage* page = get_cur_path_page(cur_path_lowest_level_);
768  SnapshotPagePointer pointer = page->get_pointer(index).snapshot_pointer_;
769  if (pointer == 0) {
770  // the page doesn't exist in previous snapshot. that's fine.
771  break;
772  } else {
773  HashIntermediatePage* child = get_cur_path_page(cur_path_lowest_level_ + 1U);
774  CHECK_ERROR_CODE(previous_snapshot_files_->read_page(pointer, child));
775  ASSERT_ND(child->header().storage_id_ == storage_id_);
776  ASSERT_ND(child->header().page_id_ == pointer);
777  ASSERT_ND(child->get_level() + 1U == cur_path_lowest_level_);
778  ASSERT_ND(child->get_bin_range().contains(bin));
779  cur_path_lowest_level_ = child->get_level();
780  cur_path_valid_range_ = child->get_bin_range();
781  }
782  }
783 
784  ASSERT_ND(cur_path_valid_range_.contains(bin));
785  ASSERT_ND(get_cur_path_lowest()->get_bin_range() == cur_path_valid_range_);
786  ASSERT_ND(verify_cur_path());
787  return kErrorCodeOk;
788 }
789 
790 bool HashComposeContext::verify_cur_path() const {
791  if (is_initial_snapshot()) {
792  ASSERT_ND(cur_path_lowest_level_ == levels_);
793  } else {
794  ASSERT_ND(cur_path_lowest_level_ < levels_);
795  }
796  for (uint8_t level = cur_path_lowest_level_; level < kHashMaxLevels; ++level) {
797  if (level >= levels_) {
798  ASSERT_ND(cur_path_[level].header().page_id_ == 0);
799  continue;
800  }
801  ASSERT_ND(cur_path_[level].header().page_id_ != 0);
802  ASSERT_ND(cur_path_[level].get_level() == level);
803  ASSERT_ND(cur_path_[level].header().storage_id_ == storage_id_);
804  if (level > cur_path_lowest_level_) {
805  HashBinRange range = cur_path_[level].get_bin_range();
806  HashBinRange child_range = cur_path_[level - 1U].get_bin_range();
807  ASSERT_ND(range.contains(child_range));
808  }
809  }
810 
811  return true;
812 }
813 
819 ErrorStack HashComposeContext::close_cur_bin() {
820  if (cur_bin_ == kCurBinNotOpened) {
821  // already closed
822  return kRetOk;
823  }
824 
825  // construct new data pages from cur_bin_table_.
826  // to make sure we have enough pages in the main buffer,
827  // we flush pages so far via a very conservative estimate.
828  // assuming each bin receives a small number of records, this doesn't harm anything.
829  uint32_t physical_records = cur_bin_table_.get_physical_record_count();
830  if (UNLIKELY(physical_records > 1000U)) {
831  LOG(WARNING) << "A hash bin has more than 1000 records?? That's an unexpected usage."
832  << " There is either a skew or mis-sizing.";
833  }
834  uint64_t remaining_buffer = allocated_pages_ - max_pages_;
835  if (UNLIKELY(remaining_buffer < physical_records)) { // super-conservative. one-record per page.
836  WRAP_ERROR_CODE(dump_data_pages());
837  }
838 
839  const SnapshotPagePointer base_pointer = snapshot_writer_->get_next_page_id();
840  HashDataPage* head_page = page_base_ + allocated_pages_;
841  SnapshotPagePointer head_page_id = base_pointer + allocated_pages_;
842  const uint8_t bin_bits = storage_.get_bin_bits();
843  const uint8_t bin_shifts = storage_.get_bin_shifts();
844  head_page->initialize_snapshot_page(storage_id_, head_page_id, cur_bin_, bin_bits, bin_shifts);
845  ++allocated_pages_;
846  ASSERT_ND(allocated_pages_ <= max_pages_);
847 
848  HashDataPage* cur_page = head_page;
849  const uint32_t begin = cur_bin_table_.get_first_record();
850  const uint32_t end = cur_bin_table_.get_records_consumed();
851  for (uint32_t i = begin; i < end; ++i) {
852  HashTmpBin::Record* record = cur_bin_table_.get_record(i);
853  ASSERT_ND(cur_bin_ == (record->hash_ >> storage_.get_bin_shifts()));
854  if (record->xct_id_.is_deleted()) {
855  continue;
856  }
857  uint16_t available = cur_page->available_space();
858  uint16_t required = cur_page->required_space(record->key_length_, record->payload_length_);
859  if (available < required) {
860  // move on to next page
861  SnapshotPagePointer page_id = base_pointer + allocated_pages_;
862  HashDataPage* next_page = page_base_ + allocated_pages_;
863  next_page->initialize_snapshot_page(storage_id_, page_id, cur_bin_, bin_bits, bin_shifts);
864  cur_page->next_page_address()->snapshot_pointer_ = page_id;
865  cur_page = next_page;
866 
867  ++allocated_pages_;
868  ASSERT_ND(allocated_pages_ <= max_pages_);
869  }
870 
871  BloomFilterFingerprint fingerprint = DataPageBloomFilter::extract_fingerprint(record->hash_);
872  cur_page->create_record_in_snapshot(
873  record->xct_id_,
874  record->hash_,
875  fingerprint,
876  record->get_key(),
877  record->key_length_,
878  record->get_payload(),
879  record->payload_length_);
880  }
881 
882  // finally, register the head page in intermediate page. the bin is now closed.
883  WRAP_ERROR_CODE(append_to_intermediate(head_page_id, cur_bin_));
884  cur_bin_ = kCurBinNotOpened;
885 
886  return kRetOk;
887 }
888 
889 ErrorStack HashComposeContext::open_cur_bin(HashBin bin) {
890  ASSERT_ND(cur_bin_ == kCurBinNotOpened);
891  // switch to an intermediate page containing this bin
892  WRAP_ERROR_CODE(update_cur_path_if_needed(bin));
893 
894  cur_bin_table_.clean_quick();
895 
896  // Load-up the cur_bin_table_ with existing records in previous snapshot
897  SnapshotPagePointer page_id = get_cur_path_bin_head(bin);
898  while (page_id) {
899  HashDataPage* page = reinterpret_cast<HashDataPage*>(data_page_io_memory_.get_block());
900  // hopefully, most of this read will be sequential. so underlying HW will pre-fetch and cache
901  WRAP_ERROR_CODE(previous_snapshot_files_->read_page(page_id, page));
902  ASSERT_ND(page->header().storage_id_ == storage_id_);
903  ASSERT_ND(page->header().page_id_ == page_id);
904  ASSERT_ND(page->get_bin() == bin);
905  ASSERT_ND(page->next_page().volatile_pointer_.is_null());
906  uint16_t records = page->get_record_count();
907  for (uint16_t i = 0; i < records; ++i) {
908  const HashDataPage::Slot& slot = page->get_slot(i);
909  ASSERT_ND(!slot.tid_.xct_id_.is_deleted());
910  ASSERT_ND(!slot.tid_.xct_id_.is_moved());
911  ASSERT_ND(!slot.tid_.xct_id_.is_being_written());
912  const char* data = page->record_from_offset(slot.offset_);
913  WRAP_ERROR_CODE(cur_bin_table_.insert_record(
914  slot.tid_.xct_id_,
915  data,
916  slot.key_length_,
917  slot.hash_,
918  data + slot.get_aligned_key_length(),
919  slot.payload_length_));
920  }
921  page_id = page->next_page().snapshot_pointer_;
922  }
923 
924  cur_bin_ = bin;
925  return kRetOk;
926 }
927 
933 ErrorStack HashComposeContext::init_intermediates() {
934  ASSERT_ND(allocated_intermediates_ == 0);
935  ASSERT_ND(intermediate_base_
936  == reinterpret_cast<HashComposedBinsPage*>(snapshot_writer_->get_intermediate_base()));
937  uint16_t count = storage_.get_root_children();
938  if (max_intermediates_ < count) {
939  return ERROR_STACK_MSG(kErrorCodeInternalError, "max_intermediates weirdly too small");
940  }
941 
942  std::memset(intermediate_base_, 0, kPageSize * count);
943  for (uint16_t i = 0; i < count; ++i) {
944  SnapshotPagePointer new_page_id = allocated_intermediates_;
945  ++allocated_intermediates_;
946  intermediate_base_[i].header_.page_id_ = new_page_id;
947  intermediate_base_[i].header_.page_type_ = kHashComposedBinsPageType;
948  uint64_t interval = kHashMaxBins[levels_ - 1U];
949  HashBinRange range(i * interval, (i + 1U) * interval);
950  intermediate_base_[i].bin_range_ = range;
951 
952  // we use the volatile pointer to represent the number of pages in the sub-tree. initialize it.
953  ASSERT_ND(root_info_page_->get_pointer(i).volatile_pointer_.word == 0); // we did mem-zero.
954  root_info_page_->get_pointer(i).volatile_pointer_.word = 1; // just one page initially
955  }
956 
957  return kRetOk;
958 }
959 
960 HashComposedBinsPage* HashComposeContext::get_intermediate_tail(uint8_t root_index) const {
961  HashComposedBinsPage* page = get_intermediate_head(root_index);
962  while (true) {
963  ASSERT_ND(page);
964  ASSERT_ND(intermediate_base_ + page->header_.page_id_ == page);
965  ASSERT_ND(page->header_.get_page_type() == kHashComposedBinsPageType);
966  if (page->next_page_ == 0) {
967  return page;
968  }
969  page = intermediate_base_ + page->next_page_;
970  }
971 }
972 
973 inline void HashComposeContext::update_cur_intermediate_tail_if_needed(HashBin bin) {
974  ASSERT_ND(bin < total_bin_count_);
975  if (LIKELY(cur_intermediate_tail_ && cur_intermediate_tail_->bin_range_.contains(bin))) {
976  return;
977  }
978  update_cur_intermediate_tail(bin);
979 }
980 
981 
982 void HashComposeContext::update_cur_intermediate_tail(HashBin bin) {
983  ASSERT_ND(!cur_intermediate_tail_ || !cur_intermediate_tail_->bin_range_.contains(bin));
984  IntermediateRoute route = IntermediateRoute::construct(bin);
985  uint8_t root_index = route.route[levels_ - 1U];
986  cur_intermediate_tail_ = get_intermediate_tail(root_index);
987  ASSERT_ND(cur_intermediate_tail_->bin_range_.contains(bin));
988 }
989 
990 ErrorCode HashComposeContext::append_to_intermediate(SnapshotPagePointer page_id, HashBin bin) {
991  update_cur_intermediate_tail_if_needed(bin);
992  ASSERT_ND(cur_intermediate_tail_->bin_range_.contains(bin));
993  ASSERT_ND(cur_intermediate_tail_->bin_count_ <= kHashComposedBinsPageMaxBins);
994  ASSERT_ND(cur_intermediate_tail_->next_page_ == 0);
995  if (cur_intermediate_tail_->bin_count_ == kHashComposedBinsPageMaxBins) {
996  // Now we need to append a new intermediate page.
997  DVLOG(1) << "Growing intermediate page in hash composer...";
998  CHECK_ERROR_CODE(expand_intermediate_pool_if_needed());
999  SnapshotPagePointer next_page_id = allocated_intermediates_;
1000  HashComposedBinsPage* next = intermediate_base_ + next_page_id;
1001  std::memset(next, 0, kPageSize);
1002  next->header_.page_id_ = next_page_id;
1003  next->bin_range_ = cur_intermediate_tail_->bin_range_;
1004  ++allocated_intermediates_;
1005  cur_intermediate_tail_->next_page_ = next_page_id;
1006  cur_intermediate_tail_ = next;
1007  ASSERT_ND(cur_intermediate_tail_->next_page_ == 0);
1008  ASSERT_ND(cur_intermediate_tail_->bin_range_.contains(bin));
1009 
1010  // also maintain the count of pages in root-child pointer
1011  uint16_t root_child = bin / kHashMaxBins[levels_ - 1U];
1012  ASSERT_ND(root_child < root_children_);
1013  ASSERT_ND((
1014  intermediate_base_ + root_info_page_->get_pointer(root_child).snapshot_pointer_)->bin_range_
1015  == cur_intermediate_tail_->bin_range_);
1016  ++root_info_page_->get_pointer(root_child).volatile_pointer_.word;
1017  }
1018 
1019  ASSERT_ND(cur_intermediate_tail_->bin_count_ < kHashComposedBinsPageMaxBins);
1020  uint8_t index = cur_intermediate_tail_->bin_count_;
1021  // hash-bin should be fully sorted
1022  ASSERT_ND(index == 0 || cur_intermediate_tail_->bins_[index - 1U].bin_ < bin);
1023  ComposedBin& entry = cur_intermediate_tail_->bins_[index];
1024  entry.page_id_ = page_id;
1025  entry.bin_ = bin;
1026  ++cur_intermediate_tail_->bin_count_;
1027  return kErrorCodeOk;
1028 }
1029 
1030 ErrorCode HashComposeContext::expand_intermediate_pool_if_needed() {
1031  ASSERT_ND(allocated_intermediates_ <= max_intermediates_);
1032  if (UNLIKELY(allocated_intermediates_ == max_intermediates_)) {
1033  LOG(INFO) << "Automatically expanding intermediate_pool. This should be a rare event";
1034  uint32_t required = allocated_intermediates_ + 1U;
1035  CHECK_ERROR_CODE(snapshot_writer_->expand_intermediate_memory(required, true));
1036  intermediate_base_
1037  = reinterpret_cast<HashComposedBinsPage*>(snapshot_writer_->get_intermediate_base());
1038  max_intermediates_ = snapshot_writer_->get_intermediate_size();
1039  }
1040  return kErrorCodeOk;
1041 }
1042 
1048 ErrorStack HashComposeContext::install_snapshot_data_pages(uint64_t* installed_count) const {
1049  *installed_count = 0;
1050  VolatilePagePointer pointer = storage_.get_control_block()->root_page_pointer_.volatile_pointer_;
1051  if (pointer.is_null()) {
1052  VLOG(0) << "No volatile pages.. maybe while restart?";
1053  return kRetOk;
1054  }
1055 
1056  HashIntermediatePage* volatile_root = resolve_intermediate(pointer);
1057 
1058  debugging::StopWatch watch;
1059  for (uint8_t root_child = 0; root_child < root_children_; ++root_child) {
1060  VolatilePagePointer child_pointer = volatile_root->get_pointer(root_child).volatile_pointer_;
1061  if (child_pointer.is_null()) {
1062  LOG(WARNING) << "Um, the subtree doesn't exist? how come. but fine";
1063  continue;
1064  }
1065 
1066  const HashComposedBinsPage* composed = get_intermediate_head(root_child);
1067  if (levels_ == 1U) {
1068  // the root child is already a data page. We should have at most one pointer to install!
1069  ASSERT_ND(volatile_root->get_level() == 0);
1070  ASSERT_ND(composed->next_page_ == 0);
1071  ASSERT_ND(composed->bin_count_ == 0 || composed->bin_count_ == 1U);
1072  if (composed->bin_count_ > 0) {
1073  ASSERT_ND(composed->bins_[0].bin_ == root_child);
1074  ASSERT_ND(verify_new_pointer(composed->bins_[0].page_id_));
1075  ASSERT_ND(verify_old_pointer(volatile_root->get_pointer(root_child).snapshot_pointer_));
1076  volatile_root->get_pointer(root_child).snapshot_pointer_ = composed->bins_[0].page_id_;
1077  }
1078  } else {
1079  ASSERT_ND(volatile_root->get_level() > 0);
1080  HashIntermediatePage* volatile_child = resolve_intermediate(child_pointer);
1081  ASSERT_ND(volatile_child->header().get_page_type() == kHashIntermediatePageType);
1082  CHECK_ERROR(install_snapshot_data_pages_root_child(
1083  composed,
1084  volatile_child,
1085  installed_count));
1086  }
1087  }
1088  watch.stop();
1089  VLOG(0) << "HashStorage-" << storage_id_ << " installed " << *installed_count << " pointers"
1090  << " to data pages in " << watch.elapsed_ms() << "ms";
1091  return kRetOk;
1092 }
1093 
1094 ErrorStack HashComposeContext::install_snapshot_data_pages_root_child(
1095  const HashComposedBinsPage* composed,
1096  HashIntermediatePage* volatile_root_child,
1097  uint64_t* installed_count) const {
1098  typedef HashIntermediatePage* PagePtr;
1099  PagePtr volatile_path[kHashMaxLevels]; // the traversal path in volatile world
1100  std::memset(volatile_path, 0, sizeof(volatile_path));
1101  volatile_path[volatile_root_child->get_level()] = volatile_root_child;
1102  uint8_t volatile_path_lowest_level = volatile_root_child->get_level();
1103 
1104  const HashComposedBinsPage* page = composed;
1105  HashBin previous_bin = kCurBinNotOpened;
1106  while (true) {
1107  for (uint16_t i = 0; i < page->bin_count_; ++i) {
1108  // entries should be sorted by bins
1109  HashBin bin = page->bins_[i].bin_;
1110  ASSERT_ND(previous_bin == kCurBinNotOpened || previous_bin < bin);
1111  previous_bin = bin;
1112  ASSERT_ND(volatile_root_child->get_bin_range().contains(bin));
1113 
1114  // go back to upper levels first,
1115  while (UNLIKELY(!volatile_path[volatile_path_lowest_level]->get_bin_range().contains(bin))) {
1116  ++volatile_path_lowest_level;
1117  ASSERT_ND(volatile_path_lowest_level <= volatile_root_child->get_level());
1118  }
1119 
1120  // then go down to hit the exact level-0 volatile page
1121  while (UNLIKELY(volatile_path_lowest_level > 0)) {
1122  PagePtr cur = volatile_path[volatile_path_lowest_level];
1123  ASSERT_ND(cur->get_level() == volatile_path_lowest_level);
1124  const HashBinRange& range = cur->get_bin_range();
1125  ASSERT_ND(range.contains(bin));
1126  uint16_t index = (bin - range.begin_) / kHashMaxBins[volatile_path_lowest_level];
1127  ASSERT_ND(verify_old_pointer(cur->get_pointer(index).snapshot_pointer_));
1128  VolatilePagePointer pointer = cur->get_pointer(index).volatile_pointer_;
1129  // something was changed in this subtree, we sure have a volatile page here.
1130  ASSERT_ND(!pointer.is_null());
1131  PagePtr next = resolve_intermediate(pointer);
1132  ASSERT_ND(next->get_bin_range().contains(bin));
1133  ASSERT_ND(next->get_level() + 1U == volatile_path_lowest_level);
1134  --volatile_path_lowest_level;
1135  volatile_path[volatile_path_lowest_level] = next;
1136  }
1137 
1138  PagePtr bottom = volatile_path[0];
1139  ASSERT_ND(volatile_path_lowest_level == 0 && bottom->get_bin_range().contains(bin));
1140  uint16_t index = bin - bottom->get_bin_range().begin_;
1141  ASSERT_ND(!bottom->get_pointer(index).volatile_pointer_.is_null());
1142  ASSERT_ND(verify_old_pointer(bottom->get_pointer(index).snapshot_pointer_));
1143  ASSERT_ND(verify_new_pointer(page->bins_[i].page_id_));
1144  bottom->get_pointer(index).snapshot_pointer_ = page->bins_[i].page_id_;
1145  ++(*installed_count);
1146  }
1147 
1148  ASSERT_ND(page->next_page_ < allocated_intermediates_);
1149  if (page->next_page_ == 0) {
1150  break;
1151  } else {
1152  page = intermediate_base_ + page->next_page_;
1153  }
1154  }
1155 
1156  return kRetOk;
1157 }
1158 
1159 
1166  Composer::DropResult result(args);
1167  if (storage_.get_hash_metadata()->keeps_all_volatile_pages()) {
1168  LOG(INFO) << "Keep-all-volatile: Storage-" << storage_.get_name()
1169  << " is configured to keep all volatile pages.";
1170  result.dropped_all_ = false;
1171  return result;
1172  }
1173 
1174  DualPagePointer* root_pointer = &storage_.get_control_block()->root_page_pointer_;
1175  HashIntermediatePage* volatile_page = resolve_intermediate(root_pointer->volatile_pointer_);
1176  if (volatile_page == nullptr) {
1177  LOG(INFO) << "No volatile root page. Probably while restart";
1178  return result;
1179  }
1180 
1181  // We iterate through all existing volatile pages to drop volatile pages of
1182  // level-3 or deeper (if the storage has only 2 levels, keeps all).
1183  // this "level-3 or deeper" is a configuration per storage.
1184  // Even if the volatile page is deeper than that, we keep them if it contains newer modification,
1185  // including descendants (so, probably we will keep higher levels anyways).
1186  uint16_t count = storage_.get_root_children();
1187  uint8_t root_level = volatile_page->get_level();
1188  ASSERT_ND(root_level + 1U == storage_.get_levels());
1189  for (uint16_t i = 0; i < count; ++i) {
1190  DualPagePointer& child_pointer = volatile_page->get_pointer(i);
1191  if (!child_pointer.volatile_pointer_.is_null() && child_pointer.snapshot_pointer_ != 0) {
1192  uint16_t partition = extract_numa_node_from_snapshot_pointer(child_pointer.snapshot_pointer_);
1193  if (!args.partitioned_drop_ || partition == args.my_partition_) {
1194  drop_volatiles_child(args, &child_pointer, root_level, &result);
1195  }
1196  }
1197  }
1198  // root page is kept at this point in this case. we need to check with other threads
1199  return result;
1200 }
1201 
1202 void HashComposer::drop_volatiles_child(
1204  DualPagePointer* child_pointer,
1205  uint8_t parent_level,
1206  Composer::DropResult *result) {
1207  if (parent_level > 0) {
1208  result->combine(drop_volatiles_recurse(args, child_pointer));
1209  } else {
1210  if (can_drop_volatile_bin(
1211  child_pointer->volatile_pointer_,
1212  args.snapshot_.valid_until_epoch_)) {
1213  drop_volatile_entire_bin(args, child_pointer);
1214  } else {
1215  result->dropped_all_ = false;
1216  // in hash composer, we currently do not emit accurate information on this.
1217  // we are not using this information anyway.
1219  }
1220  }
1221 }
1222 
1224  if (storage_.get_hash_metadata()->keeps_all_volatile_pages()) {
1225  LOG(INFO) << "Oh, but keep-all-volatile is on. Storage-" << storage_.get_name()
1226  << " is configured to keep all volatile pages.";
1227  return;
1228  }
1229  if (is_to_keep_volatile(storage_.get_levels() - 1U)) {
1230  LOG(INFO) << "Oh, but Storage-" << storage_.get_name() << " is configured to keep"
1231  << " the root page.";
1232  return;
1233  }
1234  DualPagePointer* root_pointer = &storage_.get_control_block()->root_page_pointer_;
1235  HashIntermediatePage* volatile_page = resolve_intermediate(root_pointer->volatile_pointer_);
1236  if (volatile_page == nullptr) {
1237  LOG(INFO) << "Oh, but root volatile page already null";
1238  return;
1239  }
1240 
1241  LOG(INFO) << "Okay, drop em all!!";
1242  drop_all_recurse(args, root_pointer);
1243 }
1244 
1245 void HashComposer::drop_all_recurse(
1247  DualPagePointer* pointer) {
1248  if (pointer->volatile_pointer_.is_null()) {
1249  return;
1250  }
1251  HashIntermediatePage* page = resolve_intermediate(pointer->volatile_pointer_);
1252  for (uint16_t i = 0; i < kHashIntermediatePageFanout; ++i) {
1253  DualPagePointer* child_pointer = page->get_pointer_address(i);
1254  if (page->get_level() > 0) {
1255  drop_all_recurse(args, child_pointer);
1256  } else {
1257  drop_volatile_entire_bin(args, child_pointer);
1258  }
1259  }
1260  args.drop(engine_, pointer->volatile_pointer_);
1261  pointer->volatile_pointer_.clear();
1262 }
1263 
1264 void HashComposer::drop_volatile_entire_bin(
1265  const Composer::DropVolatilesArguments& args,
1266  DualPagePointer* pointer_to_head) const {
1267  // drop data pages in a bin. this must be a while loop, not recursion.
1268  // otherwise, when there is a rare case of long linked list, outofstack!
1269  // be careful to not drop the current page before getting required information
1270  VolatilePagePointer cur_pointer = pointer_to_head->volatile_pointer_;
1271  while (!cur_pointer.is_null()) {
1272  HashDataPage* cur = resolve_data(cur_pointer);
1273  VolatilePagePointer next_pointer = cur->next_page().volatile_pointer_;
1274  args.drop(engine_, cur_pointer); // notice, we drop AFTER retrieving next_pointer
1275  cur_pointer = next_pointer;
1276  }
1277  pointer_to_head->volatile_pointer_.clear();
1278 }
1279 
1280 inline Composer::DropResult HashComposer::drop_volatiles_recurse(
1281  const Composer::DropVolatilesArguments& args,
1282  DualPagePointer* pointer) {
1283  ASSERT_ND(pointer->snapshot_pointer_ == 0
1284  || extract_snapshot_id_from_snapshot_pointer(pointer->snapshot_pointer_)
1286  // The snapshot pointer CAN be null.
1287  // It means that this subtree has not constructed a new snapshot page in this snapshot.
1288 
1289  Composer::DropResult result(args);
1290  HashIntermediatePage* page = resolve_intermediate(pointer->volatile_pointer_);
1291 
1292  // Explore/replace children first because we need to know if there is new modification.
1293  // In that case, we must keep this volatile page, too.
1294  // Intermediate volatile page is kept iff there are no child volatile pages.
1295  uint8_t this_level = page->get_level();
1296  for (uint16_t i = 0; i < kHashIntermediatePageFanout; ++i) {
1297  DualPagePointer* child_pointer = page->get_pointer_address(i);
1298  if (!child_pointer->volatile_pointer_.is_null()) {
1299  drop_volatiles_child(args, child_pointer, this_level, &result);
1300  }
1301  }
1302  if (result.dropped_all_) {
1303  if (is_to_keep_volatile(page->get_level())) {
1304  DVLOG(2) << "Exempted";
1305  result.dropped_all_ = false;
1306  } else {
1307  args.drop(engine_, pointer->volatile_pointer_);
1308  pointer->volatile_pointer_.clear();
1309  }
1310  } else {
1311  DVLOG(1) << "Couldn't drop an intermediate page that has a recent modification in child";
1312  }
1313  ASSERT_ND(!result.dropped_all_ || pointer->volatile_pointer_.is_null());
1314  return result;
1315 }
1316 
1317 bool HashComposer::can_drop_volatile_bin(VolatilePagePointer head, Epoch valid_until) const {
1318  for (HashDataPage* cur = resolve_data(head);
1319  cur;
1320  cur = resolve_data(cur->next_page().volatile_pointer_)) {
1321  for (uint16_t i = 0; i < cur->get_record_count(); ++i) {
1322  Epoch epoch = cur->get_slot(i).tid_.xct_id_.get_epoch();
1323  if (epoch > valid_until) {
1324  return false;
1325  }
1326  }
1327  }
1328 
1329  return true;
1330 }
1331 
1332 
1333 inline bool HashComposer::is_to_keep_volatile(uint16_t level) {
1334  /*
1335  uint16_t threshold = storage_.get_hash_metadata()->snapshot_drop_volatile_pages_threshold_;
1336  uint16_t hash_levels = storage_.get_levels();
1337  ASSERT_ND(level < hash_levels);
1338  // examples:
1339  // when threshold=0, all levels (0~hash_levels-1) should return false.
1340  // when threshold=1, only root level (hash_levels-1) should return true
1341  // when threshold=2, upto hash_levels-2..
1342  return threshold + level >= hash_levels;
1343  */
1344  return level + 2U > storage_.get_levels(); // TASK(Hideaki) should be a config
1345 }
1346 
1347 } // namespace hash
1348 } // namespace storage
1349 } // namespace foedus
const Page *const * root_info_pages_
Root info pages output by compose()
Definition: composer.hpp:130
void drop_root_volatile(const Composer::DropVolatilesArguments &args)
ErrorStack construct_root(const Composer::ConstructRootArguments &args)
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
Epoch max_observed_
the largest Epoch it observed recursively.
Definition: composer.hpp:200
snapshot::LogGleanerResource * gleaner_resource_
All pre-allocated resouces to help run construct_root(), such as memory buffers.
Definition: composer.hpp:134
numa_alloc_onnode() and numa_free().
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Definition: merge_sort.hpp:378
ErrorCode create_memory(uint16_t numa_node, uint64_t initial_size=kDefaultInitialSize)
Allocates the memory to use by this object.
Definition: hash_tmpbin.cpp:36
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
Definition: composer.hpp:110
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:91
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
ComposedBin bins_[kHashComposedBinsPageMaxBins]
void assert_type() const __attribute__((always_inline))
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
uint8_t page_type_
Actually of PageType.
Definition: page.hpp:205
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Definition: merge_sort.cpp:273
Represents a logic to compose a new version of data pages for one storage.
Definition: composer.hpp:86
const HashBinRange & get_bin_range() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
void clean()
Removes all tuple data for the current bin.
Definition: hash_tmpbin.cpp:74
HashIntermediatePage * resolve_intermediate_impl(const memory::GlobalVolatilePageResolver &resolver, VolatilePagePointer pointer)
ErrorCode update_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length)
Updates a record of the given key with the given payload, which might change length.
const uint16_t kHashComposedBinsPageMaxBins
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
double elapsed_ms() const
Definition: stop_watch.hpp:48
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
Definition: snapshot.hpp:55
bool keeps_all_volatile_pages() const
Definition: metadata.hpp:98
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
Composer::DropResult drop_volatiles(const Composer::DropVolatilesArguments &args)
drop_volatiles and related methods
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorCode dump_intermediates(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the sub intermediate page pool.
void clean_quick()
This version selectively clears buckets_ by seeing individual records.
Definition: hash_tmpbin.cpp:81
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
0x002A : foedus::storage::hash::HashDeleteLogType .
Definition: log_type.hpp:123
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Definition: composer.hpp:99
Holds a set of read-only file objects for snapshot files.
bool is_equivalent(const VolatilePagePointer &other) const
Definition: storage_id.hpp:215
const Metadata * get_metadata() const
Returns the metadata of this storage.
Definition: storage.hpp:162
Declares common log types used in all packages.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
Definition: merge_sort.cpp:143
static void launch_construct_root_multi_level(HashComposer *pointer, const Composer::ConstructRootArguments *args, uint16_t numa_node, HashIntermediatePage *root_page, ErrorStack *out_error)
launched on its own thread.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, HashBin bin, uint8_t bin_bits, uint8_t bin_shifts)
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
uint32_t log_streams_count_
Number of sorted runs.
Definition: composer.hpp:103
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
bool contains(HashBin hash) const
Definition: hash_id.hpp:176
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
HashComposer(Composer *parent)
HashComposer methods.
const uint8_t kHashMaxLevels
Max level of intermediate pages.
Definition: hash_id.hpp:104
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
0 means no-error.
Definition: error_code.hpp:87
RecordIndex get_records_consumed() const
const StorageName & get_name() const
Returns the unique name of this storage.
Definition: storage.hpp:155
Declares all log types used in this storage type.
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
Definition: merge_sort.hpp:77
bool exists() const
Returns whether this storage is already created.
Definition: storage.hpp:169
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
HashComposer's compose() implementation separated from the class itself.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Page * root_info_page_
[OUT] Returns pointers and related information that is required to construct the root page...
Definition: composer.hpp:116
MergedPosition get_current_count() const __attribute__((always_inline))
Definition: merge_sort.hpp:369
PageType get_page_type() const
Definition: page.hpp:280
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
Represents a range of hash bins in a hash storage, such as what an intermediate page is responsible f...
Definition: hash_id.hpp:172
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
Epoch one_more() const
Definition: epoch.hpp:127
0x0029 : foedus::storage::hash::HashInsertLogType .
Definition: log_type.hpp:122
storage::Page * get_page_base() __attribute__((always_inline))
uint32_t get_physical_record_count() const
ErrorCode delete_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash)
Logically deletes a record of the given key.
Retrun value of drop_volatiles()
Definition: composer.hpp:171
SnapshotPagePointer * new_root_page_pointer_
[OUT] Returns pointer to new root snapshot page/
Definition: composer.hpp:136
HashBin end_
Exclusive end of the range.
Definition: hash_id.hpp:193
ErrorCode read_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, void *out)
Read contiguous pages in one shot.
bool dropped_all_
Whether all volatile pages under the page was dropped.
Definition: composer.hpp:202
void * get_block() const
Returns the memory block.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
uint16_t my_partition_
if partitioned_drop_ is true, the partition this thread should drop volatile pages from ...
Definition: composer.hpp:152
ErrorStack compose(const Composer::ComposeArguments &args)
Record * get_record(RecordIndex index) const
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Definition: composer.hpp:128
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
uint64_t get_size() const
Returns the byte size of the memory block.
Represents an intermediate page in Hashtable Storage.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, uint8_t level, HashBin start_bin)
storage::Page * get_intermediate_base() __attribute__((always_inline))
static BloomFilterFingerprint extract_fingerprint(HashValue fullhash)
ErrorCode insert_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length)
Inserts a new record of the given key and payload.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
Represents an individual data page in Hashtable Storage.
ErrorCode overwrite_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_offset, uint16_t payload_count)
Overwrites a part of the record of the given key.
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
Definition: composer.hpp:126
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
0x000C : "GENERAL: Other uncategorized errors." .
Definition: error_code.hpp:116
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
Definition: composer.hpp:97
0x0028 : foedus::storage::hash::HashOverwriteLogType .
Definition: log_type.hpp:121
VolatilePagePointer construct_volatile_page_pointer(uint64_t word)
Definition: storage_id.hpp:230
const ErrorStack kRetOk
Normal return value for no-error case.
bool partitioned_drop_
if true, one thread for each partition will invoke drop_volatiles()
Definition: composer.hpp:154
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
const HashMetadata * get_hash_metadata() const
HashValue hash_
Hash value of the key.
const HashBin kInvalidHashBin
This value or larger never appears as a valid HashBin.
Definition: hash_id.hpp:162
RecordIndex get_first_record() const
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
Definition: hash_id.hpp:49
HashComposeContext(Engine *engine, snapshot::MergeSort *merge_sort, snapshot::SnapshotWriter *snapshot_writer, cache::SnapshotFileSet *previous_snapshot_files, Page *root_info_page)
HashComposeContext methods.
uint64_t get_key() const __attribute__((always_inline))
Definition: merge_sort.hpp:132
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
Definition: hash_id.hpp:74
uint32_t root_info_pages_count_
Number of root info pages.
Definition: composer.hpp:132
Base class for log type of record-wise operation.
static IntermediateRoute construct(HashBin bin)
Calculates the rout for the given hash bin.
void combine(const DropResult &other)
Definition: composer.hpp:176
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
HashDataPage * resolve_data_impl(const memory::GlobalVolatilePageResolver &resolver, VolatilePagePointer pointer)
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
DualPagePointer & get_pointer(uint16_t index)
snapshot::SortedBuffer *const * log_streams_
Sorted runs.
Definition: composer.hpp:101
CONTROL_BLOCK * get_control_block() const
Definition: attachable.hpp:97
snapshot::Snapshot snapshot_
The new snapshot.
Definition: composer.hpp:150
void drop(Engine *engine, VolatilePagePointer pointer) const
Returns (might cache) the given pointer to volatile pool.
Definition: composer.cpp:114
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Definition: composer.hpp:105
0x002B : foedus::storage::hash::HashUpdateLogType .
Definition: log_type.hpp:124
storage::SnapshotPagePointer get_next_page_id() const
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
Entries we actually sort.
Definition: merge_sort.hpp:116
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
uint64_t HashValue
Represents a full 64-bit hash value calculated from a key.
Definition: hash_id.hpp:129
A page to pack many ComposedBin as an output of composer.
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
Writes out one snapshot file for all data pages in one reducer.
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112