20 #include <glog/logging.h>
90 : engine_(parent->get_engine()),
91 storage_id_(parent->get_storage_id()),
92 storage_(engine_, storage_id_),
93 volatile_resolver_(engine_->get_memory_manager()->get_global_volatile_page_resolver()) {
139 if (old_root_page_id != 0) {
156 CHECK_ERROR(construct_root_single_level(args, root_page));
159 LOG(INFO) <<
to_string() <<
" construct_root() multi-level parallelized path";
160 std::vector< std::thread > threads;
163 std::unique_ptr< ErrorStack[] > error_stacks(
new ErrorStack[nodes]);
164 for (uint16_t numa_node = 0; numa_node < nodes; ++numa_node) {
165 threads.emplace_back(
171 error_stacks.get() + numa_node);
174 LOG(INFO) <<
to_string() <<
" construct_root() launched " << nodes <<
" threads...";
175 for (uint16_t numa_node = 0; numa_node < nodes; ++numa_node) {
176 threads[numa_node].join();
177 if (error_stacks.get()[numa_node].is_error()) {
178 LOG(ERROR) <<
to_string() <<
" construct_root() observed an error in the launched"
179 <<
" thread-" << numa_node <<
". will propagate after joining the worker threads. "
180 <<
"error description: " << error_stacks.get()[numa_node];
184 LOG(INFO) <<
to_string() <<
" construct_root() joined";
186 for (uint16_t numa_node = 0; numa_node < nodes; ++numa_node) {
200 storage_.
get_control_block()->root_page_pointer_.snapshot_pointer_ = new_root_page_id;
205 ErrorStack HashComposer::construct_root_single_level(
209 LOG(INFO) <<
to_string() <<
" construct_root() Single-level path";
226 for (uint16_t index = 0; index < root_children; ++index) {
240 for (uint16_t index = 0; index < root_children; ++index) {
241 HashComposedBinsPage* bins_page = buffer + index;
242 if (bins_page->bin_count_) {
244 ASSERT_ND(bins_page->bins_[0].bin_ == index);
259 ErrorStack HashComposer::construct_root_multi_level(
260 const Composer::ConstructRootArguments& args,
262 HashIntermediatePage* root_page) {
266 thread::NumaThreadScope numa_scope(numa_node);
268 VLOG(0) <<
to_string() <<
" construct_root() child thread numa_node-" << numa_node;
273 cache::SnapshotFileSet fileset(engine_);
277 snapshot::LogGleanerResource::PerNodeResource& resource
278 = args.gleaner_resource_->per_node_resources_[numa_node];
281 std::unique_ptr< snapshot::SnapshotWriter > snapshot_writer_to_delete(
nullptr);
282 snapshot::SnapshotWriter* snapshot_writer;
283 if (numa_node == 0) {
284 snapshot_writer = args.snapshot_writer_;
286 snapshot_writer_to_delete.reset(
new snapshot::SnapshotWriter(
290 &resource.write_buffer_,
291 &resource.write_intermediate_buffer_,
293 snapshot_writer = snapshot_writer_to_delete.get();
297 const uint32_t inputs = args.root_info_pages_count_;
300 ComposedBinsMergedStream streams;
301 for (uint16_t index = 0; index < root_children; ++index) {
302 if (index % nodes != numa_node) {
311 bool had_any_change =
false;
312 uint32_t writer_buffer_pos = 0;
313 uint32_t writer_higher_buffer_pos = 0;
316 reinterpret_cast<const HashRootInfoPage* const*>(args.root_info_pages_),
317 args.root_info_pages_count_,
320 &resource.read_buffer_,
324 &writer_higher_buffer_pos,
327 if (!had_any_change) {
328 VLOG(0) <<
to_string() <<
" empty sub-tree " << index;
330 VLOG(0) <<
to_string() <<
" processing sub-tree " << index <<
"...";
331 uint32_t installed_count = 0;
343 &writer_higher_buffer_pos));
345 VLOG(0) <<
to_string() <<
" processed sub-tree " << index <<
"."
346 <<
" installed_count=" << installed_count
347 <<
" writer_buffer_pos=" << writer_buffer_pos
348 <<
" writer_higher_buffer_pos=" << writer_higher_buffer_pos;
358 ASSERT_ND(writer_higher_buffer_pos == 0);
361 HashIntermediatePage* higher_base
362 =
reinterpret_cast<HashIntermediatePage*
>(snapshot_writer->get_intermediate_base());
364 HashIntermediatePage* root_child = higher_base + 0;
366 ASSERT_ND(root_page->get_pointer(index).snapshot_pointer_ == 0);
373 uint8_t* ref_counts =
new uint8_t[writer_higher_buffer_pos];
374 std::memset(ref_counts, 0,
sizeof(uint8_t) * writer_higher_buffer_pos);
375 uint32_t total_replaced = 0;
378 for (uint32_t i = 0; i < writer_higher_buffer_pos; ++i) {
380 HashIntermediatePage* page = higher_base + i;
382 page->header().page_id_ = page_id;
386 ASSERT_ND(
id < writer_higher_buffer_pos);
387 page->get_pointer(j).snapshot_pointer_ =
id + base_id;
396 ASSERT_ND(total_replaced + 1U == writer_higher_buffer_pos);
398 for (uint32_t i = 1; i < writer_higher_buffer_pos; ++i) {
404 WRAP_ERROR_CODE(snapshot_writer->dump_intermediates(0, writer_higher_buffer_pos));
405 root_page->get_pointer(index).snapshot_pointer_ = base_id;
411 if (numa_node != 0) {
412 snapshot_writer_to_delete.get()->close();
415 VLOG(0) <<
to_string() <<
" construct_root() child thread numa_node-" << numa_node <<
" done";
420 inline HashDataPage* HashComposer::resolve_data(VolatilePagePointer pointer)
const {
423 inline HashIntermediatePage* HashComposer::resolve_intermediate(VolatilePagePointer pointer)
const {
428 return std::string(
"HashComposer-") + std::to_string(storage_id_);
441 Page* root_info_page)
443 merge_sort_(merge_sort),
444 system_initial_epoch_(engine->get_savepoint_manager()->get_initial_durable_epoch()),
445 storage_id_(merge_sort_->get_storage_id()),
446 snapshot_id_(snapshot_writer->get_snapshot_id()),
447 storage_(engine, storage_id_),
448 snapshot_writer_(snapshot_writer),
449 previous_snapshot_files_(previous_snapshot_files),
451 partitionable_(engine_->get_soc_count() > 1U),
452 levels_(storage_.get_levels()),
453 bin_bits_(storage_.get_bin_bits()),
454 bin_shifts_(storage_.get_bin_shifts()),
455 root_children_(storage_.get_root_children()),
456 numa_node_(snapshot_writer->get_numa_node()),
457 total_bin_count_(storage_.get_bin_count()),
458 previous_root_page_pointer_(storage_.get_metadata()->root_snapshot_page_id_),
459 volatile_resolver_(engine->get_memory_manager()->get_global_volatile_page_resolver()) {
460 cur_path_memory_.
alloc(
466 cur_path_lowest_level_ = levels_;
469 cur_bin_ = kCurBinNotOpened;
470 cur_intermediate_tail_ =
nullptr;
472 data_page_io_memory_.
alloc(
478 allocated_pages_ = 0;
479 allocated_intermediates_ = 0;
489 std::memset(root_info_page_, 0,
kPageSize);
494 cur_bin_table_.
clean();
495 VLOG(0) <<
"HashComposer-" << storage_id_ <<
" initialization done. processing...";
497 bool processed_any =
false;
498 cur_bin_ = kCurBinNotOpened;
505 processed_any =
true;
508 while (cur < count) {
511 if (cur_bin_ != head_bin) {
513 ASSERT_ND(cur_bin_ == kCurBinNotOpened || cur_bin_ < head_bin);
522 for (next = cur + 1U;
LIKELY(next < count); ++next) {
537 if (!processed_any) {
538 LOG(ERROR) <<
"wtf? no logs? storage-" << storage_id_;
545 ErrorCode HashComposeContext::apply_batch(uint64_t cur, uint64_t next) {
547 const uint16_t kFetchSize = 8;
550 uint16_t desired = std::min<uint16_t>(kFetchSize, next - cur);
551 uint16_t fetched = merge_sort_->
fetch_logs(cur, desired, logs);
552 for (uint16_t i = 0; i < kFetchSize &&
LIKELY(i < fetched); ++i) {
597 ErrorStack HashComposeContext::finalize() {
601 if (allocated_pages_ > 0) {
608 uint64_t installed_count = 0;
609 CHECK_ERROR(install_snapshot_data_pages(&installed_count));
617 for (uint32_t i = 0; i < root_children_; ++i) {
623 for (uint32_t i = 0; i < allocated_intermediates_; ++i) {
624 HashComposedBinsPage* page = intermediate_base_ + i;
627 page->header_.page_id_ = new_page_id;
628 if (page->next_page_) {
630 ASSERT_ND(page->next_page_ < allocated_intermediates_);
632 page->next_page_ = base_pointer + page->next_page_;
640 ErrorCode HashComposeContext::dump_data_pages() {
646 allocated_pages_ = 0;
651 inline HashDataPage* HashComposeContext::resolve_data(VolatilePagePointer pointer)
const {
654 inline HashIntermediatePage* HashComposeContext::resolve_intermediate(
655 VolatilePagePointer pointer)
const {
678 if (cur_path_lowest_level_ > 0) {
681 ASSERT_ND(cur_path_[0].get_bin_range().contains(bin));
686 ErrorStack HashComposeContext::init_cur_path() {
687 if (previous_root_page_pointer_ == 0) {
689 std::memset(cur_path_, 0,
kPageSize * levels_);
690 cur_path_lowest_level_ = levels_;
691 cur_path_valid_range_ = HashBinRange(0,
kHashMaxBins[levels_]);
694 HashIntermediatePage* root = get_cur_path_page(levels_ - 1U);
696 ASSERT_ND(root->header().storage_id_ == storage_id_);
697 ASSERT_ND(root->header().page_id_ == previous_root_page_pointer_);
698 ASSERT_ND(root->get_level() + 1U == levels_);
700 cur_path_lowest_level_ = root->get_level();
701 cur_path_valid_range_ = root->get_bin_range();
703 HashIntermediatePage* parent = root;
704 while (parent->get_level() > 0) {
705 HashIntermediatePage* child = get_cur_path_page(parent->get_level() - 1U);
712 ASSERT_ND(child->header().storage_id_ == storage_id_);
713 ASSERT_ND(child->header().page_id_ == pointer);
714 ASSERT_ND(child->get_level() + 1U == parent->get_level());
715 ASSERT_ND(child->get_bin_range() == HashBinRange(0ULL, parent->get_level()));
716 cur_path_lowest_level_ = child->get_level();
717 cur_path_valid_range_ = child->get_bin_range();
725 inline ErrorCode HashComposeContext::update_cur_path_if_needed(
HashBin bin) {
729 if (
LIKELY(is_initial_snapshot()
731 || (cur_path_valid_range_.
contains(bin) && cur_path_lowest_level_ == 0))) {
735 return update_cur_path(bin);
745 while (!cur_path_valid_range_.
contains(bin)) {
746 ASSERT_ND(cur_path_lowest_level_ + 1U < levels_);
747 ++cur_path_lowest_level_;
748 cur_path_valid_range_ = get_cur_path_lowest()->
get_bin_range();
749 ASSERT_ND(get_cur_path_lowest()->get_bin_range() == cur_path_valid_range_);
758 for (uint8_t level = cur_path_lowest_level_; level + 1U < levels_; ++level) {
760 HashIntermediatePage* parent = get_cur_path_page(level + 1U);
761 ASSERT_ND(parent->get_pointer(route.route[level + 1U]).snapshot_pointer_ == child_id);
765 while (cur_path_lowest_level_ > 0) {
766 uint8_t index = route.route[cur_path_lowest_level_];
767 HashIntermediatePage* page = get_cur_path_page(cur_path_lowest_level_);
773 HashIntermediatePage* child = get_cur_path_page(cur_path_lowest_level_ + 1U);
775 ASSERT_ND(child->header().storage_id_ == storage_id_);
776 ASSERT_ND(child->header().page_id_ == pointer);
777 ASSERT_ND(child->get_level() + 1U == cur_path_lowest_level_);
778 ASSERT_ND(child->get_bin_range().contains(bin));
779 cur_path_lowest_level_ = child->get_level();
780 cur_path_valid_range_ = child->get_bin_range();
785 ASSERT_ND(get_cur_path_lowest()->get_bin_range() == cur_path_valid_range_);
790 bool HashComposeContext::verify_cur_path()
const {
791 if (is_initial_snapshot()) {
792 ASSERT_ND(cur_path_lowest_level_ == levels_);
794 ASSERT_ND(cur_path_lowest_level_ < levels_);
796 for (uint8_t level = cur_path_lowest_level_; level <
kHashMaxLevels; ++level) {
797 if (level >= levels_) {
798 ASSERT_ND(cur_path_[level].header().page_id_ == 0);
801 ASSERT_ND(cur_path_[level].header().page_id_ != 0);
802 ASSERT_ND(cur_path_[level].get_level() == level);
803 ASSERT_ND(cur_path_[level].header().storage_id_ == storage_id_);
804 if (level > cur_path_lowest_level_) {
806 HashBinRange child_range = cur_path_[level - 1U].
get_bin_range();
819 ErrorStack HashComposeContext::close_cur_bin() {
820 if (cur_bin_ == kCurBinNotOpened) {
830 if (
UNLIKELY(physical_records > 1000U)) {
831 LOG(WARNING) <<
"A hash bin has more than 1000 records?? That's an unexpected usage."
832 <<
" There is either a skew or mis-sizing.";
834 uint64_t remaining_buffer = allocated_pages_ - max_pages_;
835 if (
UNLIKELY(remaining_buffer < physical_records)) {
840 HashDataPage* head_page = page_base_ + allocated_pages_;
844 head_page->initialize_snapshot_page(storage_id_, head_page_id, cur_bin_, bin_bits, bin_shifts);
846 ASSERT_ND(allocated_pages_ <= max_pages_);
848 HashDataPage* cur_page = head_page;
851 for (uint32_t i = begin; i < end; ++i) {
852 HashTmpBin::Record* record = cur_bin_table_.
get_record(i);
854 if (record->xct_id_.is_deleted()) {
857 uint16_t available = cur_page->available_space();
858 uint16_t required = cur_page->required_space(record->key_length_, record->payload_length_);
859 if (available < required) {
862 HashDataPage* next_page = page_base_ + allocated_pages_;
864 cur_page->next_page_address()->snapshot_pointer_ = page_id;
865 cur_page = next_page;
868 ASSERT_ND(allocated_pages_ <= max_pages_);
872 cur_page->create_record_in_snapshot(
878 record->get_payload(),
879 record->payload_length_);
884 cur_bin_ = kCurBinNotOpened;
889 ErrorStack HashComposeContext::open_cur_bin(
HashBin bin) {
899 HashDataPage* page =
reinterpret_cast<HashDataPage*
>(data_page_io_memory_.
get_block());
902 ASSERT_ND(page->header().storage_id_ == storage_id_);
903 ASSERT_ND(page->header().page_id_ == page_id);
905 ASSERT_ND(page->next_page().volatile_pointer_.is_null());
906 uint16_t records = page->get_record_count();
907 for (uint16_t i = 0; i < records; ++i) {
908 const HashDataPage::Slot& slot = page->get_slot(i);
909 ASSERT_ND(!slot.tid_.xct_id_.is_deleted());
910 ASSERT_ND(!slot.tid_.xct_id_.is_moved());
911 ASSERT_ND(!slot.tid_.xct_id_.is_being_written());
912 const char* data = page->record_from_offset(slot.offset_);
918 data + slot.get_aligned_key_length(),
919 slot.payload_length_));
921 page_id = page->next_page().snapshot_pointer_;
933 ErrorStack HashComposeContext::init_intermediates() {
934 ASSERT_ND(allocated_intermediates_ == 0);
938 if (max_intermediates_ < count) {
942 std::memset(intermediate_base_, 0,
kPageSize * count);
943 for (uint16_t i = 0; i < count; ++i) {
945 ++allocated_intermediates_;
949 HashBinRange range(i * interval, (i + 1U) * interval);
960 HashComposedBinsPage* HashComposeContext::get_intermediate_tail(uint8_t root_index)
const {
961 HashComposedBinsPage* page = get_intermediate_head(root_index);
964 ASSERT_ND(intermediate_base_ + page->header_.page_id_ == page);
966 if (page->next_page_ == 0) {
973 inline void HashComposeContext::update_cur_intermediate_tail_if_needed(
HashBin bin) {
978 update_cur_intermediate_tail(bin);
982 void HashComposeContext::update_cur_intermediate_tail(
HashBin bin) {
985 uint8_t root_index = route.route[levels_ - 1U];
986 cur_intermediate_tail_ = get_intermediate_tail(root_index);
991 update_cur_intermediate_tail_if_needed(bin);
997 DVLOG(1) <<
"Growing intermediate page in hash composer...";
1000 HashComposedBinsPage* next = intermediate_base_ + next_page_id;
1002 next->header_.page_id_ = next_page_id;
1003 next->bin_range_ = cur_intermediate_tail_->
bin_range_;
1004 ++allocated_intermediates_;
1005 cur_intermediate_tail_->
next_page_ = next_page_id;
1006 cur_intermediate_tail_ = next;
1011 uint16_t root_child = bin /
kHashMaxBins[levels_ - 1U];
1020 uint8_t index = cur_intermediate_tail_->
bin_count_;
1023 ComposedBin& entry = cur_intermediate_tail_->
bins_[index];
1030 ErrorCode HashComposeContext::expand_intermediate_pool_if_needed() {
1031 ASSERT_ND(allocated_intermediates_ <= max_intermediates_);
1032 if (
UNLIKELY(allocated_intermediates_ == max_intermediates_)) {
1033 LOG(INFO) <<
"Automatically expanding intermediate_pool. This should be a rare event";
1034 uint32_t required = allocated_intermediates_ + 1U;
1048 ErrorStack HashComposeContext::install_snapshot_data_pages(uint64_t* installed_count)
const {
1049 *installed_count = 0;
1050 VolatilePagePointer pointer = storage_.
get_control_block()->root_page_pointer_.volatile_pointer_;
1051 if (pointer.is_null()) {
1052 VLOG(0) <<
"No volatile pages.. maybe while restart?";
1056 HashIntermediatePage* volatile_root = resolve_intermediate(pointer);
1058 debugging::StopWatch watch;
1059 for (uint8_t root_child = 0; root_child < root_children_; ++root_child) {
1060 VolatilePagePointer child_pointer = volatile_root->get_pointer(root_child).volatile_pointer_;
1061 if (child_pointer.is_null()) {
1062 LOG(WARNING) <<
"Um, the subtree doesn't exist? how come. but fine";
1066 const HashComposedBinsPage* composed = get_intermediate_head(root_child);
1067 if (levels_ == 1U) {
1069 ASSERT_ND(volatile_root->get_level() == 0);
1071 ASSERT_ND(composed->bin_count_ == 0 || composed->bin_count_ == 1U);
1072 if (composed->bin_count_ > 0) {
1073 ASSERT_ND(composed->bins_[0].bin_ == root_child);
1074 ASSERT_ND(verify_new_pointer(composed->bins_[0].page_id_));
1075 ASSERT_ND(verify_old_pointer(volatile_root->get_pointer(root_child).snapshot_pointer_));
1076 volatile_root->get_pointer(root_child).snapshot_pointer_ = composed->bins_[0].page_id_;
1079 ASSERT_ND(volatile_root->get_level() > 0);
1080 HashIntermediatePage* volatile_child = resolve_intermediate(child_pointer);
1082 CHECK_ERROR(install_snapshot_data_pages_root_child(
1089 VLOG(0) <<
"HashStorage-" << storage_id_ <<
" installed " << *installed_count <<
" pointers"
1090 <<
" to data pages in " << watch.elapsed_ms() <<
"ms";
1094 ErrorStack HashComposeContext::install_snapshot_data_pages_root_child(
1095 const HashComposedBinsPage* composed,
1096 HashIntermediatePage* volatile_root_child,
1097 uint64_t* installed_count)
const {
1098 typedef HashIntermediatePage* PagePtr;
1100 std::memset(volatile_path, 0,
sizeof(volatile_path));
1101 volatile_path[volatile_root_child->get_level()] = volatile_root_child;
1102 uint8_t volatile_path_lowest_level = volatile_root_child->get_level();
1104 const HashComposedBinsPage* page = composed;
1105 HashBin previous_bin = kCurBinNotOpened;
1107 for (uint16_t i = 0; i < page->bin_count_; ++i) {
1109 HashBin bin = page->bins_[i].bin_;
1110 ASSERT_ND(previous_bin == kCurBinNotOpened || previous_bin < bin);
1112 ASSERT_ND(volatile_root_child->get_bin_range().contains(bin));
1115 while (
UNLIKELY(!volatile_path[volatile_path_lowest_level]->get_bin_range().contains(bin))) {
1116 ++volatile_path_lowest_level;
1117 ASSERT_ND(volatile_path_lowest_level <= volatile_root_child->get_level());
1121 while (
UNLIKELY(volatile_path_lowest_level > 0)) {
1122 PagePtr cur = volatile_path[volatile_path_lowest_level];
1123 ASSERT_ND(cur->get_level() == volatile_path_lowest_level);
1124 const HashBinRange& range = cur->get_bin_range();
1126 uint16_t index = (bin - range.begin_) /
kHashMaxBins[volatile_path_lowest_level];
1127 ASSERT_ND(verify_old_pointer(cur->get_pointer(index).snapshot_pointer_));
1128 VolatilePagePointer pointer = cur->get_pointer(index).volatile_pointer_;
1131 PagePtr next = resolve_intermediate(pointer);
1132 ASSERT_ND(next->get_bin_range().contains(bin));
1133 ASSERT_ND(next->get_level() + 1U == volatile_path_lowest_level);
1134 --volatile_path_lowest_level;
1135 volatile_path[volatile_path_lowest_level] = next;
1138 PagePtr bottom = volatile_path[0];
1139 ASSERT_ND(volatile_path_lowest_level == 0 && bottom->get_bin_range().contains(bin));
1140 uint16_t index = bin - bottom->get_bin_range().begin_;
1141 ASSERT_ND(!bottom->get_pointer(index).volatile_pointer_.is_null());
1142 ASSERT_ND(verify_old_pointer(bottom->get_pointer(index).snapshot_pointer_));
1143 ASSERT_ND(verify_new_pointer(page->bins_[i].page_id_));
1144 bottom->get_pointer(index).snapshot_pointer_ = page->bins_[i].page_id_;
1145 ++(*installed_count);
1148 ASSERT_ND(page->next_page_ < allocated_intermediates_);
1149 if (page->next_page_ == 0) {
1152 page = intermediate_base_ + page->next_page_;
1168 LOG(INFO) <<
"Keep-all-volatile: Storage-" << storage_.
get_name()
1169 <<
" is configured to keep all volatile pages.";
1176 if (volatile_page ==
nullptr) {
1177 LOG(INFO) <<
"No volatile root page. Probably while restart";
1187 uint8_t root_level = volatile_page->
get_level();
1189 for (uint16_t i = 0; i < count; ++i) {
1194 drop_volatiles_child(args, &child_pointer, root_level, &result);
1202 void HashComposer::drop_volatiles_child(
1205 uint8_t parent_level,
1207 if (parent_level > 0) {
1208 result->
combine(drop_volatiles_recurse(args, child_pointer));
1210 if (can_drop_volatile_bin(
1213 drop_volatile_entire_bin(args, child_pointer);
1225 LOG(INFO) <<
"Oh, but keep-all-volatile is on. Storage-" << storage_.
get_name()
1226 <<
" is configured to keep all volatile pages.";
1229 if (is_to_keep_volatile(storage_.
get_levels() - 1U)) {
1230 LOG(INFO) <<
"Oh, but Storage-" << storage_.
get_name() <<
" is configured to keep"
1231 <<
" the root page.";
1236 if (volatile_page ==
nullptr) {
1237 LOG(INFO) <<
"Oh, but root volatile page already null";
1241 LOG(INFO) <<
"Okay, drop em all!!";
1242 drop_all_recurse(args, root_pointer);
1245 void HashComposer::drop_all_recurse(
1254 if (page->get_level() > 0) {
1255 drop_all_recurse(args, child_pointer);
1257 drop_volatile_entire_bin(args, child_pointer);
1264 void HashComposer::drop_volatile_entire_bin(
1265 const Composer::DropVolatilesArguments& args,
1266 DualPagePointer* pointer_to_head)
const {
1270 VolatilePagePointer cur_pointer = pointer_to_head->volatile_pointer_;
1271 while (!cur_pointer.is_null()) {
1272 HashDataPage* cur = resolve_data(cur_pointer);
1273 VolatilePagePointer next_pointer = cur->next_page().volatile_pointer_;
1274 args.drop(engine_, cur_pointer);
1275 cur_pointer = next_pointer;
1277 pointer_to_head->volatile_pointer_.clear();
1280 inline Composer::DropResult HashComposer::drop_volatiles_recurse(
1281 const Composer::DropVolatilesArguments& args,
1282 DualPagePointer* pointer) {
1283 ASSERT_ND(pointer->snapshot_pointer_ == 0
1289 Composer::DropResult result(args);
1290 HashIntermediatePage* page = resolve_intermediate(pointer->volatile_pointer_);
1295 uint8_t this_level = page->get_level();
1297 DualPagePointer* child_pointer = page->get_pointer_address(i);
1298 if (!child_pointer->volatile_pointer_.is_null()) {
1299 drop_volatiles_child(args, child_pointer, this_level, &result);
1303 if (is_to_keep_volatile(page->get_level())) {
1304 DVLOG(2) <<
"Exempted";
1307 args.drop(engine_, pointer->volatile_pointer_);
1308 pointer->volatile_pointer_.clear();
1311 DVLOG(1) <<
"Couldn't drop an intermediate page that has a recent modification in child";
1317 bool HashComposer::can_drop_volatile_bin(VolatilePagePointer head, Epoch valid_until)
const {
1318 for (HashDataPage* cur = resolve_data(head);
1320 cur = resolve_data(cur->next_page().volatile_pointer_)) {
1321 for (uint16_t i = 0; i < cur->get_record_count(); ++i) {
1322 Epoch epoch = cur->get_slot(i).tid_.xct_id_.get_epoch();
1323 if (epoch > valid_until) {
1333 inline bool HashComposer::is_to_keep_volatile(uint16_t level) {
const Page *const * root_info_pages_
Root info pages output by compose()
void drop_root_volatile(const Composer::DropVolatilesArguments &args)
SnapshotPagePointer next_page_
ErrorStack construct_root(const Composer::ConstructRootArguments &args)
bool is_ended_all() const
Epoch max_observed_
the largest Epoch it observed recursively.
snapshot::LogGleanerResource * gleaner_resource_
All pre-allocated resouces to help run construct_root(), such as memory buffers.
numa_alloc_onnode() and numa_free().
Represents a pointer to another page (usually a child page).
const SortEntry * get_sort_entries() const __attribute__((always_inline))
ErrorCode create_memory(uint16_t numa_node, uint64_t initial_size=kDefaultInitialSize)
Allocates the memory to use by this object.
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
ComposedBin bins_[kHashComposedBinsPageMaxBins]
void assert_type() const __attribute__((always_inline))
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Represents a logic to compose a new version of data pages for one storage.
const HashBinRange & get_bin_range() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
void clean()
Removes all tuple data for the current bin.
HashIntermediatePage * resolve_intermediate_impl(const memory::GlobalVolatilePageResolver &resolver, VolatilePagePointer pointer)
ErrorCode update_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length)
Updates a record of the given key with the given payload, which might change length.
const uint16_t kHashComposedBinsPageMaxBins
uint8_t get_levels() const
double elapsed_ms() const
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
Represents a pointer to a volatile page with modification count for preventing ABA.
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
Composer::DropResult drop_volatiles(const Composer::DropVolatilesArguments &args)
drop_volatiles and related methods
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
ErrorCode dump_intermediates(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the sub intermediate page pool.
void clean_quick()
This version selectively clears buckets_ by seeing individual records.
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
0x002A : foedus::storage::hash::HashDeleteLogType .
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Holds a set of read-only file objects for snapshot files.
bool is_equivalent(const VolatilePagePointer &other) const
const Metadata * get_metadata() const
Returns the metadata of this storage.
Declares common log types used in all packages.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
static void launch_construct_root_multi_level(HashComposer *pointer, const Composer::ConstructRootArguments *args, uint16_t numa_node, HashIntermediatePage *root_page, ErrorStack *out_error)
launched on its own thread.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, HashBin bin, uint8_t bin_bits, uint8_t bin_shifts)
#define LIKELY(x)
Hints that x is highly likely true.
uint32_t log_streams_count_
Number of sorted runs.
std::string to_string() const
bool is_master() const
Returns if this engine object is a master instance.
SnapshotId get_snapshot_id() const
bool contains(HashBin hash) const
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
HashComposer(Composer *parent)
HashComposer methods.
const uint8_t kHashMaxLevels
Max level of intermediate pages.
uint8_t get_bin_bits() const
VolatilePagePointer volatile_pointer_
RecordIndex get_records_consumed() const
const StorageName & get_name() const
Returns the unique name of this storage.
Declares all log types used in this storage type.
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
bool exists() const
Returns whether this storage is already created.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
HashComposer's compose() implementation separated from the class itself.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Page * root_info_page_
[OUT] Returns pointers and related information that is required to construct the root page...
MergedPosition get_current_count() const __attribute__((always_inline))
Database engine object that holds all resources and provides APIs.
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
uint64_t stop()
Take another current time tick.
SnapshotPagePointer snapshot_pointer_
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
Represents a range of hash bins in a hash storage, such as what an intermediate page is responsible f...
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
0x0029 : foedus::storage::hash::HashInsertLogType .
storage::Page * get_page_base() __attribute__((always_inline))
uint32_t get_physical_record_count() const
ErrorCode delete_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash)
Logically deletes a record of the given key.
Retrun value of drop_volatiles()
memory::AlignedMemory writer_pool_memory_
SnapshotPagePointer page_id_
SnapshotPagePointer * new_root_page_pointer_
[OUT] Returns pointer to new root snapshot page/
HashBin end_
Exclusive end of the range.
ErrorCode read_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, void *out)
Read contiguous pages in one shot.
bool dropped_all_
Whether all volatile pages under the page was dropped.
void * get_block() const
Returns the memory block.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
uint16_t SnapshotId
Unique ID of Snapshot.
uint16_t my_partition_
if partitioned_drop_ is true, the partition this thread should drop volatile pages from ...
ErrorStack compose(const Composer::ComposeArguments &args)
Record * get_record(RecordIndex index) const
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
HashBin get_bin_count() const
const SnapshotId kNullSnapshotId
uint64_t get_size() const
Returns the byte size of the memory block.
Represents an intermediate page in Hashtable Storage.
uint8_t get_bin_shifts() const
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, uint8_t level, HashBin start_bin)
storage::Page * get_intermediate_base() __attribute__((always_inline))
static BloomFilterFingerprint extract_fingerprint(HashValue fullhash)
ErrorCode insert_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length)
Inserts a new record of the given key and payload.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
uint64_t HashBin
Represents a bin of a hash value.
Represents an individual data page in Hashtable Storage.
ErrorCode overwrite_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_offset, uint16_t payload_count)
Overwrites a part of the record of the given key.
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
0x000C : "GENERAL: Other uncategorized errors." .
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
0x0028 : foedus::storage::hash::HashOverwriteLogType .
VolatilePagePointer construct_volatile_page_pointer(uint64_t word)
const ErrorStack kRetOk
Normal return value for no-error case.
bool partitioned_drop_
if true, one thread for each partition will invoke drop_volatiles()
uint8_t get_level() const
const HashMetadata * get_hash_metadata() const
HashValue hash_
Hash value of the key.
const HashBin kInvalidHashBin
This value or larger never appears as a valid HashBin.
RecordIndex get_first_record() const
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
HashComposeContext(Engine *engine, snapshot::MergeSort *merge_sort, snapshot::SnapshotWriter *snapshot_writer, cache::SnapshotFileSet *previous_snapshot_files, Page *root_info_page)
HashComposeContext methods.
memory::AlignedMemory tmp_root_page_memory_
uint64_t get_key() const __attribute__((always_inline))
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
uint32_t root_info_pages_count_
Number of root info pages.
Base class for log type of record-wise operation.
void combine(const DropResult &other)
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
A high-resolution stop watch.
HashDataPage * resolve_data_impl(const memory::GlobalVolatilePageResolver &resolver, VolatilePagePointer pointer)
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
DualPagePointer & get_pointer(uint16_t index)
snapshot::SortedBuffer *const * log_streams_
Sorted runs.
CONTROL_BLOCK * get_control_block() const
snapshot::Snapshot snapshot_
The new snapshot.
void drop(Engine *engine, VolatilePagePointer pointer) const
Returns (might cache) the given pointer to volatile pool.
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
0x002B : foedus::storage::hash::HashUpdateLogType .
storage::SnapshotPagePointer get_next_page_id() const
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Entries we actually sort.
ErrorCode
Enum of error codes defined in error_code.xmacro.
uint16_t get_root_children() const
Arguments for drop_volatiles()
uint64_t HashValue
Represents a full 64-bit hash value calculated from a key.
A page to pack many ComposedBin as an output of composer.
Writes out one snapshot file for all data pages in one reducer.
Arguments for construct_root()