20 #include <glog/logging.h>
60 : engine_(parent->get_engine()),
61 storage_id_(parent->get_storage_id()),
62 storage_(engine_, storage_id_) {
122 for (uint8_t level = 1; level < levels; ++level) {
134 system_initial_epoch,
149 for (uint16_t j = 0; j < root_children; ++j) {
168 for (uint16_t j = 0; j < root_children; ++j) {
175 storage_.
get_control_block()->root_page_pointer_.snapshot_pointer_ = new_page_id;
183 return std::string(
"ArrayComposer-") + std::to_string(storage_id_);
196 Page* root_info_page)
198 merge_sort_(merge_sort),
199 system_initial_epoch_(engine->get_savepoint_manager()->get_initial_durable_epoch()),
200 storage_id_(merge_sort_->get_storage_id()),
201 snapshot_id_(snapshot_writer->get_snapshot_id()),
202 storage_(engine, storage_id_),
203 snapshot_writer_(snapshot_writer),
204 previous_snapshot_files_(previous_snapshot_files),
206 payload_size_(storage_.get_payload_size()),
207 levels_(storage_.get_levels()),
208 previous_root_page_pointer_(storage_.get_metadata()->root_snapshot_page_id_) {
211 for (uint8_t level = 1; level < levels_; ++level) {
212 offset_intervals_[level] = offset_intervals_[level - 1] *
kInteriorFanout;
214 std::memset(cur_path_, 0,
sizeof(cur_path_));
216 allocated_pages_ = 0;
217 allocated_intermediates_ = 0;
231 std::memset(root_info_page_, 0,
kPageSize);
237 return execute_single_level_array();
244 bool processed_any =
false;
252 if (!processed_any) {
254 processed_any =
true;
261 while (cur < count) {
265 ASSERT_ND(head_offset == sort_entries[cur].get_key());
273 for (next = cur + 1U;
LIKELY(next < count); ++next) {
282 apply_batch(cur, next);
291 LOG(ERROR) <<
"wtf? no logs? storage-" << storage_id_;
295 uint16_t children = get_root_children();
298 for (uint16_t i = 0; i < children; ++i) {
303 || (is_initial_snapshot() && root_info_page_->
pointers_[i] == 0));
314 void ArrayComposeContext::apply_batch(uint64_t cur, uint64_t next) {
315 const uint16_t kFetchSize = 8;
320 uint16_t desired = std::min<uint16_t>(kFetchSize, next - cur);
321 uint16_t fetched = merge_sort_->
fetch_logs(cur, desired, logs);
322 for (uint16_t i = 0; i < kFetchSize &&
LIKELY(i < fetched); ++i) {
334 const ArrayIncrementLogType* casted
335 =
reinterpret_cast<const ArrayIncrementLogType*
>(log);
344 ErrorStack ArrayComposeContext::execute_single_level_array() {
348 cur_path_[0] = page_base_;
351 allocated_pages_ = 1;
352 WRAP_ERROR_CODE(read_or_init_page(previous_root_page_pointer_, page_id, 0, range, cur_path_[0]));
361 apply_batch(0, count);
365 ASSERT_ND(allocated_intermediates_ == 0);
367 ASSERT_ND(page_id == page_base_[0].header().page_id_);
378 uint16_t ArrayComposeContext::get_root_children()
const {
379 uint64_t child_interval = offset_intervals_[levels_ - 2U];
383 ErrorStack ArrayComposeContext::finalize() {
386 ArrayRange last_range = cur_path_[0]->get_array_range();
387 if (is_initial_snapshot() && last_range.end_ < storage_.
get_array_size()) {
388 VLOG(0) <<
"Need to fill out empty pages in initial snapshot of array-" << storage_id_
389 <<
", from " << last_range.end_ <<
" to the end of array";
394 if (allocated_pages_ > 0) {
403 ArrayPage* root_page = intermediate_base_;
404 ASSERT_ND(root_page == cur_path_[levels_ - 1]);
405 ASSERT_ND(root_page->get_level() == levels_ - 1);
406 ASSERT_ND(root_page->header().page_id_ == 0);
410 root_page->header().page_id_ = base_pointer;
411 for (uint32_t i = 1; i < allocated_intermediates_; ++i) {
413 ArrayPage* page = intermediate_base_ + i;
416 ASSERT_ND(page->get_level() < levels_ - 1U);
417 page->header().page_id_ = new_page_id;
418 if (page->get_level() > 1U) {
425 DualPagePointer& pointer = page->get_interior_record(j);
426 ASSERT_ND(pointer.volatile_pointer_.is_null());
432 ASSERT_ND(page_id < allocated_intermediates_);
433 pointer.snapshot_pointer_ = base_pointer + page_id;
434 ASSERT_ND(verify_snapshot_pointer(pointer.snapshot_pointer_));
444 const uint16_t root_children = get_root_children();
447 for (uint16_t j = 0; j < root_children; ++j) {
448 DualPagePointer& pointer = root_page->get_interior_record(j);
449 ASSERT_ND(pointer.volatile_pointer_.is_null());
456 if (snapshot_id == snapshot_id_) {
462 ASSERT_ND(verify_snapshot_pointer(pointer.snapshot_pointer_));
468 ASSERT_ND(page_id < allocated_intermediates_);
469 pointer.snapshot_pointer_ = base_pointer + page_id;
470 ASSERT_ND(verify_snapshot_pointer(pointer.snapshot_pointer_));
476 root_info_page_->
pointers_[j] = pointer.snapshot_pointer_;
478 ASSERT_ND((!is_initial_snapshot() && page_id != 0 && snapshot_id != snapshot_id_)
479 || (is_initial_snapshot() && page_id == 0));
483 ASSERT_ND(root_page->get_interior_record(j).is_both_null());
488 uint64_t installed_count = 0;
489 CHECK_ERROR(install_snapshot_pointers(base_pointer, &installed_count));
494 ErrorStack ArrayComposeContext::initialize(
ArrayOffset initial_offset) {
495 ASSERT_ND(allocated_intermediates_ == 0);
503 if (is_initial_snapshot()) {
504 ArrayRange leaf_range = to_leaf_range(initial_offset);
505 VLOG(0) <<
"Need to fill out empty pages in initial snapshot of array-" << storage_id_
506 <<
", upto " << leaf_range.end_;
509 ASSERT_ND(cur_path_[0]->get_array_range() == leaf_range);
514 ErrorStack ArrayComposeContext::init_root_page() {
515 uint8_t level = levels_ - 1U;
517 ArrayPage* page = intermediate_base_;
519 ASSERT_ND(allocated_intermediates_ == 0);
520 allocated_intermediates_ = 1;
522 WRAP_ERROR_CODE(read_or_init_page(previous_root_page_pointer_, 0, level, range, page));
523 cur_path_[level] = page;
532 ArrayPage* page = cur_path_[levels_ - 1U];
540 || partitioning_data_->
bucket_size_ == offset_intervals_[levels_ - 2U]);
544 const uint8_t child_level = levels_ - 2U;
545 const uint64_t interval = offset_intervals_[child_level];
546 const uint16_t first_child = from / interval;
550 if (interval * children > to) {
555 for (uint16_t i = first_child; i < children; ++i) {
561 ASSERT_ND(i == first_child || page->get_interior_record(i).snapshot_pointer_ == 0);
563 ArrayRange child_range(i * interval, (i + 1U) * interval, storage_.
get_array_size());
564 if (page->get_interior_record(i).snapshot_pointer_ == 0) {
565 if (child_level > 0) {
572 cur_path_[child_level]->header().page_id_) == snapshot_id_);
577 page->get_interior_record(i).snapshot_pointer_ = cur_path_[child_level]->header().page_id_;
578 if (child_level > 0) {
579 CHECK_ERROR_CODE(create_empty_pages_recurse(from, to, cur_path_[child_level]));
585 ErrorCode ArrayComposeContext::create_empty_pages_recurse(
589 const uint8_t cur_level = page->get_level();
591 ArrayRange page_range = page->get_array_range();
595 const uint8_t child_level = cur_level - 1U;
596 const uint64_t interval = offset_intervals_[child_level];
597 ASSERT_ND(page_range.end_ == page_range.begin_ + interval * kInteriorFanout
598 || (page_range.begin_ + interval * kInteriorFanout > storage_.
get_array_size()
601 uint16_t first_child = 0;
602 if (from > page_range.begin_) {
603 ASSERT_ND(from < page_range.begin_ + interval * kInteriorFanout);
604 first_child = (from - page_range.begin_) / interval;
609 if (page_range.begin_ + interval * children > to) {
615 for (uint16_t i = first_child; i < children; ++i) {
616 ASSERT_ND(i == first_child || page->get_interior_record(i).snapshot_pointer_ == 0);
617 ArrayRange child_range(
618 page_range.begin_ + i * interval,
619 page_range.begin_ + (i + 1U) * interval,
622 if (page->get_interior_record(i).snapshot_pointer_ == 0) {
623 if (child_level > 0) {
630 cur_path_[child_level]->header().page_id_) == snapshot_id_);
635 page->get_interior_record(i).snapshot_pointer_ = cur_path_[child_level]->header().page_id_;
636 if (child_level > 0) {
637 CHECK_ERROR_CODE(create_empty_pages_recurse(from, to, cur_path_[child_level]));
644 ErrorCode ArrayComposeContext::dump_leaf_pages() {
650 allocated_pages_ = 0;
654 ErrorCode ArrayComposeContext::create_empty_intermediate_page(
659 DualPagePointer& pointer = parent->get_interior_record(index);
661 uint8_t level = parent->get_level() - 1U;
663 ArrayPage* page = intermediate_base_ + allocated_intermediates_;
665 ++allocated_intermediates_;
668 cur_path_[level] = page;
672 ErrorCode ArrayComposeContext::create_empty_leaf_page(
677 DualPagePointer& pointer = parent->get_interior_record(index);
679 if (allocated_pages_ >= max_pages_) {
685 ArrayPage* page = page_base_ + allocated_pages_;
687 ASSERT_ND(verify_snapshot_pointer(new_page_id));
690 pointer.snapshot_pointer_ = new_page_id;
696 inline ErrorCode ArrayComposeContext::expand_intermediate_pool_if_needed() {
697 ASSERT_ND(allocated_intermediates_ <= max_intermediates_);
698 if (
UNLIKELY(allocated_intermediates_ == max_intermediates_)) {
699 LOG(INFO) <<
"Automatically expanding intermediate_pool. This should be a rare event";
700 uint32_t required = allocated_intermediates_ + 1U;
710 ASSERT_ND(cur_path_[0] ==
nullptr || next_offset >= cur_path_[0]->get_array_range().begin_);
711 if (cur_path_[0] !=
nullptr && next_offset < cur_path_[0]->get_array_range().end_) {
717 ArrayRange next_range = to_leaf_range(next_offset);
718 ArrayOffset jump_from = cur_path_[0] ==
nullptr ? 0 : cur_path_[0]->get_array_range().end_;
720 if (jump_to > jump_from && is_initial_snapshot()) {
721 VLOG(0) <<
"Need to fill out empty pages in initial snapshot of array-" << storage_id_
722 <<
", from " << jump_from <<
" to " << jump_to;
727 ASSERT_ND(cur_path_[levels_ - 1U]->get_array_range().contains(next_offset));
728 for (uint8_t level = levels_ - 2U; level <
kMaxLevels; --level) {
730 if (cur_path_[level] !=
nullptr && cur_path_[level]->get_array_range().contains(next_offset)) {
736 ArrayPage* parent = cur_path_[level + 1U];
737 ArrayRange parent_range = parent->get_array_range();
738 ASSERT_ND(parent_range.contains(next_offset));
739 uint64_t interval = offset_intervals_[level];
740 uint16_t i = (next_offset - parent_range.begin_) / interval;
743 ArrayRange child_range(
744 parent_range.begin_ + i * interval,
745 parent_range.begin_ + (i + 1U) * interval,
748 DualPagePointer& pointer = parent->get_interior_record(i);
749 ASSERT_ND(pointer.volatile_pointer_.is_null());
751 ASSERT_ND((!is_initial_snapshot() && old_page_id != 0)
752 || (is_initial_snapshot() && old_page_id == 0));
759 page = intermediate_base_ + allocated_intermediates_;
760 new_page_id = allocated_intermediates_;
761 ++allocated_intermediates_;
762 CHECK_ERROR_CODE(read_or_init_page(old_page_id, new_page_id, level, child_range, page));
765 if (allocated_pages_ >= max_pages_) {
771 page = page_base_ + allocated_pages_;
773 ASSERT_ND(verify_snapshot_pointer(new_page_id));
775 CHECK_ERROR_CODE(read_or_init_page(old_page_id, new_page_id, level, child_range, page));
777 ASSERT_ND(page->header().page_id_ == new_page_id);
778 pointer.snapshot_pointer_ = new_page_id;
779 cur_path_[level] = page;
785 inline ErrorCode ArrayComposeContext::read_or_init_page(
791 ASSERT_ND(new_page_id != 0 || level == levels_ - 1U);
792 if (old_page_id != 0) {
795 ASSERT_ND(page->header().storage_id_ == storage_id_);
796 ASSERT_ND(page->header().page_id_ == old_page_id);
798 ASSERT_ND(page->get_array_range() == range);
799 page->header().page_id_ = new_page_id;
802 page->initialize_snapshot_page(
803 system_initial_epoch_,
813 bool ArrayComposeContext::verify_cur_path()
const {
814 for (uint8_t level = 0; level <
kMaxLevels; ++level) {
815 if (level >= levels_) {
820 ASSERT_ND(cur_path_[level]->get_level() == level);
821 ASSERT_ND(cur_path_[level]->get_storage_id() == storage_id_);
842 ErrorStack ArrayComposeContext::install_snapshot_pointers(
844 uint64_t* installed_count)
const {
848 *installed_count = 0;
849 VolatilePagePointer pointer = storage_.
get_control_block()->root_page_pointer_.volatile_pointer_;
850 if (pointer.is_null()) {
851 VLOG(0) <<
"No volatile pages.. maybe while restart?";
855 const memory::GlobalVolatilePageResolver& resolver
857 ArrayPage* volatile_root =
reinterpret_cast<ArrayPage*
>(resolver.resolve_offset(pointer));
862 debugging::StopWatch watch;
863 const ArrayPage* snapshot_root = intermediate_base_;
871 VLOG(0) <<
"ArrayStorage-" << storage_id_ <<
" installed " << *installed_count <<
" pointers"
872 <<
" in " << watch.elapsed_ms() <<
"ms";
876 ErrorCode ArrayComposeContext::install_snapshot_pointers_recurse(
878 const memory::GlobalVolatilePageResolver& resolver,
879 const ArrayPage* snapshot_page,
880 ArrayPage* volatile_page,
881 uint64_t* installed_count)
const {
882 ASSERT_ND(snapshot_page->get_array_range() == volatile_page->get_array_range());
885 const bool needs_recursion = snapshot_page->get_level() > 1U;
893 if (snapshot_id != snapshot_id_) {
898 DualPagePointer& target = volatile_page->get_interior_record(i);
899 target.snapshot_pointer_ = pointer;
900 ++(*installed_count);
902 if (needs_recursion) {
905 VolatilePagePointer volatile_pointer = target.volatile_pointer_;
906 if (!volatile_pointer.is_null()) {
907 ArrayPage* volatile_next
908 =
reinterpret_cast<ArrayPage*
>(resolver.resolve_offset(volatile_pointer));
909 uint64_t offset = pointer - snapshot_base;
910 const ArrayPage* snapshot_next = intermediate_base_ + offset;
932 LOG(INFO) <<
"Keep-all-volatile: Storage-" << storage_.
get_name()
933 <<
" is configured to keep all volatile pages.";
940 if (volatile_page ==
nullptr) {
941 LOG(INFO) <<
"No volatile root page. Probably while restart";
947 if (volatile_page->
is_leaf()) {
948 LOG(INFO) <<
"Single-page array skipped by .";
963 result.
combine(drop_volatiles_recurse(args, &child_pointer));
973 LOG(INFO) <<
"Oh, but keep-all-volatile is on. Storage-" << storage_.
get_name()
974 <<
" is configured to keep all volatile pages.";
977 if (is_to_keep_volatile(storage_.
get_levels() - 1U)) {
978 LOG(INFO) <<
"Oh, but Storage-" << storage_.
get_name() <<
" is configured to keep"
979 <<
" the root page.";
984 if (volatile_page ==
nullptr) {
985 LOG(INFO) <<
"Oh, but root volatile page already null";
989 if (volatile_page->
is_leaf()) {
992 for (uint16_t i = 0; i < records; ++i) {
997 LOG(INFO) <<
"Oh, but the root volatile page in single-level array contains a new rec";
1004 LOG(INFO) <<
"Okay, drop em all!!";
1005 drop_all_recurse(args, root_pointer);
1008 void ArrayComposer::drop_all_recurse(
1015 if (!page->is_leaf()) {
1018 drop_all_recurse(args, &child_pointer);
1025 inline ArrayPage* ArrayComposer::resolve_volatile(VolatilePagePointer pointer) {
1026 if (pointer.is_null()) {
1029 const memory::GlobalVolatilePageResolver& page_resolver
1031 return reinterpret_cast<ArrayPage*
>(page_resolver.resolve_offset(pointer));
1034 inline Composer::DropResult ArrayComposer::drop_volatiles_recurse(
1035 const Composer::DropVolatilesArguments& args,
1036 DualPagePointer* pointer) {
1037 if (pointer->volatile_pointer_.is_null()) {
1038 return Composer::DropResult(args);
1040 ASSERT_ND(pointer->snapshot_pointer_ == 0
1045 ArrayPage* child_page = resolve_volatile(pointer->volatile_pointer_);
1046 if (child_page->is_leaf()) {
1047 return drop_volatiles_leaf(args, pointer, child_page);
1049 return drop_volatiles_intermediate(args, pointer, child_page);
1053 Composer::DropResult ArrayComposer::drop_volatiles_intermediate(
1054 const Composer::DropVolatilesArguments& args,
1055 DualPagePointer* pointer,
1056 ArrayPage* volatile_page) {
1057 ASSERT_ND(!volatile_page->header().snapshot_);
1059 Composer::DropResult result(args);
1065 DualPagePointer& child_pointer = volatile_page->get_interior_record(i);
1066 result.combine(drop_volatiles_recurse(args, &child_pointer));
1069 if (result.dropped_all_) {
1070 if (is_to_keep_volatile(volatile_page->get_level())) {
1071 DVLOG(2) <<
"Exempted";
1072 result.dropped_all_ =
false;
1074 args.drop(engine_, pointer->volatile_pointer_);
1075 pointer->volatile_pointer_.clear();
1078 DVLOG(1) <<
"Couldn't drop an intermediate page that has a recent modification in child";
1080 ASSERT_ND(!result.dropped_all_ || pointer->volatile_pointer_.is_null());
1084 inline Composer::DropResult ArrayComposer::drop_volatiles_leaf(
1085 const Composer::DropVolatilesArguments& args,
1086 DualPagePointer* pointer,
1087 ArrayPage* volatile_page) {
1088 ASSERT_ND(!volatile_page->header().snapshot_);
1090 Composer::DropResult result(args);
1091 if (is_to_keep_volatile(volatile_page->get_level())) {
1092 DVLOG(2) <<
"Exempted";
1093 result.dropped_all_ =
false;
1098 const ArrayRange& range = volatile_page->get_array_range();
1099 ASSERT_ND(range.end_ <= range.begin_ + volatile_page->get_leaf_record_count());
1100 ASSERT_ND(range.end_ == range.begin_ + volatile_page->get_leaf_record_count()
1102 uint16_t records = range.end_ - range.begin_;
1103 for (uint16_t i = 0; i < records; ++i) {
1104 Record* record = volatile_page->get_leaf_record(i, payload_size);
1107 result.on_rec_observed(epoch);
1109 if (result.dropped_all_) {
1110 args.drop(engine_, pointer->volatile_pointer_);
1111 pointer->volatile_pointer_.clear();
1115 inline bool ArrayComposer::is_to_keep_volatile(uint16_t level) {
1117 uint16_t array_levels = storage_.
get_levels();
1123 return threshold + level >= array_levels;
const Page *const * root_info_pages_
Root info pages output by compose()
ArrayComposeContext(Engine *engine, snapshot::MergeSort *merge_sort, snapshot::SnapshotWriter *snapshot_writer, cache::SnapshotFileSet *previous_snapshot_files, Page *root_info_page)
ArrayComposeContext methods.
bool is_ended_all() const
void drop_root_volatile(const Composer::DropVolatilesArguments &args)
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Represents one record in our key-value store.
Represents a pointer to another page (usually a child page).
const SortEntry * get_sort_entries() const __attribute__((always_inline))
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
bool partitionable_
if false, every record goes to node-0.
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
Declares all log types used in this storage type.
uint32_t fetch_logs(uint32_t sort_pos, uint32_t count, log::RecordLogType const **out) const
To reduce L1 cache miss stall, we prefetch some number of position entries and the pointed log entrie...
Represents a logic to compose a new version of data pages for one storage.
ArrayComposer's compose() implementation separated from the class itself.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
const ArrayMetadata * get_array_metadata() const
void initialize_snapshot_page(Epoch initial_epoch, StorageId storage_id, SnapshotPagePointer page_id, uint16_t payload_size, uint8_t level, const ArrayRange &array_range)
Called only when this page is initialized.
uint64_t ArrayOffset
The only key type in array storage.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
const uint8_t kMaxLevels
Code in array storage assumes this number as the maximum number of levels.
ErrorStack compose(const Composer::ComposeArguments &args)
PartitionId bucket_owners_[kInteriorFanout]
partition of each bucket.
ArrayOffset array_size_
Size of the entire array.
uint16_t get_payload_size() const
Returns byte size of one record in this array storage without internal overheads. ...
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Represents one data page in Array Storage.
ErrorCode dump_intermediates(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the sub intermediate page pool.
XctId xct_id_
the second 64bit: Persistent status part of TID.
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
Holds a set of read-only file objects for snapshot files.
ArrayOffset bucket_size_
bucket = offset / bucket_size_.
const Metadata * get_metadata() const
Returns the metadata of this storage.
Output of one compose() call, which are then combined in construct_root().
const log::RecordLogType * resolve_sort_position(uint32_t sort_pos) const __attribute__((always_inline))
ArrayComposer(Composer *parent)
ArrayComposer methods.
Declares common log types used in all packages.
ErrorStack next_batch()
Executes merge-sort on several thousands of logs and provides the result as a batch.
0x0023 : foedus::storage::array::ArrayIncrementLogType .
std::string to_string() const
0x0022 : foedus::storage::array::ArrayOverwriteLogType .
#define LIKELY(x)
Hints that x is highly likely true.
uint32_t log_streams_count_
Number of sorted runs.
void apply_record(thread::Thread *context, StorageId storage_id, xct::RwLockableXctId *owner_id, char *payload) const __attribute__((always_inline))
bool is_master() const
Returns if this engine object is a master instance.
ArrayOffset get_array_size() const
Returns the size of this array.
SnapshotPagePointer pointers_[kInteriorFanout]
Pointers to direct children of root.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
ErrorStack construct_root(const Composer::ConstructRootArguments &args)
SnapshotId get_snapshot_id() const
A base class for ArrayOverwriteLogType/ArrayIncrementLogType.
ArrayOffset begin_
Inclusive beginning of the offset range.
VolatilePagePointer volatile_pointer_
Log type of array-storage's overwrite operation.
const DualPagePointer & get_interior_record(uint16_t record) const __attribute__((always_inline))
const ArrayRange & get_array_range() const
char payload_[8]
Arbitrary payload given by the user.
ArrayOffset end_
Exclusive end of the offset range.
const Record * get_leaf_record(uint16_t record, uint16_t payload_size) const __attribute__((always_inline))
const StorageName & get_name() const
Returns the unique name of this storage.
Receives an arbitrary number of sorted buffers and emits one fully sorted stream of logs...
bool exists() const
Returns whether this storage is already created.
Composer::DropResult drop_volatiles(const Composer::DropVolatilesArguments &args)
drop_volatiles and related methods
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Page * root_info_page_
[OUT] Returns pointers and related information that is required to construct the root page...
MergedPosition get_current_count() const __attribute__((always_inline))
Database engine object that holds all resources and provides APIs.
uint64_t stop()
Take another current time tick.
SnapshotPagePointer snapshot_pointer_
Epoch get_epoch() const __attribute__((always_inline))
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
storage::Page * get_page_base() __attribute__((always_inline))
Retrun value of drop_volatiles()
SnapshotPagePointer * new_root_page_pointer_
[OUT] Returns pointer to new root snapshot page/
bool dropped_all_
Whether all volatile pages under the page was dropped.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
uint16_t SnapshotId
Unique ID of Snapshot.
bool contains(ArrayOffset offset) const
uint16_t my_partition_
if partitioned_drop_ is true, the partition this thread should drop volatile pages from ...
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
const SnapshotId kNullSnapshotId
uint8_t get_levels() const
Returns the number of levels.
storage::Page * get_intermediate_base() __attribute__((always_inline))
Represents an offset range in an array storage.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Packages logic and required properties to calculate LookupRoute in array storage from offset...
uint16_t get_records_in_leaf() const __attribute__((always_inline))
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
uint16_t get_numa_node() const
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
const ErrorStack kRetOk
Normal return value for no-error case.
bool partitioned_drop_
if true, one thread for each partition will invoke drop_volatiles()
int64_t int_div_ceil(int64_t dividee, int64_t dividor)
Efficient ceil(dividee/dividor) for integer.
Epoch get_initial_durable_epoch() const
uint64_t get_key() const __attribute__((always_inline))
uint32_t root_info_pages_count_
Number of root info pages.
Base class for log type of record-wise operation.
void combine(const DropResult &other)
#define UNLIKELY(x)
Hints that x is highly likely false.
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
A high-resolution stop watch.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
snapshot::SortedBuffer *const * log_streams_
Sorted runs.
CONTROL_BLOCK * get_control_block() const
snapshot::Snapshot snapshot_
The new snapshot.
void drop(Engine *engine, VolatilePagePointer pointer) const
Returns (might cache) the given pointer to volatile pool.
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
storage::SnapshotPagePointer get_next_page_id() const
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Entries we actually sort.
ErrorCode
Enum of error codes defined in error_code.xmacro.
xct::RwLockableXctId owner_id_
This indicates the transaction that most recently modified this record.
Arguments for drop_volatiles()
Writes out one snapshot file for all data pages in one reducer.
Arguments for construct_root()