20 #include <glog/logging.h>
37 namespace sequential {
40 if (!latest_snapshot_epoch.
is_valid()) {
43 if (from_epoch > latest_snapshot_epoch) {
46 return latest_snapshot_epoch;
60 xct_(&context->get_current_xct()),
61 engine_(context->get_engine()),
62 resolver_(engine_->get_memory_manager()->get_global_volatile_page_resolver()),
65 from_epoch.is_valid() ? from_epoch : engine_->get_savepoint_manager()->get_earliest_epoch()),
67 to_epoch.is_valid() ? to_epoch : engine_->get_xct_manager()->get_current_grace_epoch()),
68 latest_snapshot_epoch_(engine_->get_snapshot_manager()->get_snapshot_epoch()),
70 node_filter_(node_filter),
71 node_count_(engine_->get_soc_count()),
72 order_mode_(order_mode),
74 buffer_size_(buffer_size),
78 finished_snapshots_ =
false;
79 finished_safe_volatiles_ =
false;
80 finished_unsafe_volatiles_ =
false;
89 || (latest_snapshot_epoch_.
is_valid() && to_epoch_ <= latest_snapshot_epoch_)) {
90 snapshot_only_ =
true;
91 safe_epoch_only_ =
true;
92 finished_safe_volatiles_ =
true;
93 finished_unsafe_volatiles_ =
true;
95 snapshot_only_ =
false;
96 if (to_epoch_ <= grace_epoch_) {
97 safe_epoch_only_ =
true;
105 safe_epoch_only_ =
false;
109 if (!latest_snapshot_epoch_.
is_valid() || latest_snapshot_epoch_ < from_epoch_) {
110 finished_snapshots_ =
true;
118 SequentialCursor::NodeState::NodeState(uint16_t node_id) : node_id_(node_id) {
119 volatile_cur_core_ = 0;
120 snapshot_cur_head_ = 0;
121 snapshot_cur_buffer_ = 0;
122 snapshot_buffered_pages_ = 0;
123 snapshot_buffer_begin_ = 0;
125 SequentialCursor::NodeState::~NodeState() {}
133 cur_record_length_ = 0;
136 stat_skipped_records_ = 0;
144 from_epoch_(from_epoch),
146 record_count_(batch->get_record_count()) {
152 stat_skipped_records_ = 0;
154 ++stat_skipped_records_;
161 if (states_.empty()) {
166 if (!finished_snapshots_) {
171 DVLOG(1) <<
"Finished reading snapshot pages:";
174 refresh_grace_epoch();
178 if (!finished_safe_volatiles_) {
183 DVLOG(1) <<
"Finished reading safe volatile pages:";
186 refresh_grace_epoch();
190 if (!finished_unsafe_volatiles_) {
195 DVLOG(1) <<
"Finished reading unsafe volatile pages:";
205 ErrorCode SequentialCursor::init_states() {
206 DVLOG(0) <<
"Initializing states...";
208 for (uint16_t node_id = 0; node_id < node_count_; ++node_id) {
209 states_.emplace_back(node_id);
216 DVLOG(0) <<
"truncate_epoch_=" << truncate_epoch_;
217 if (truncate_epoch_ > from_epoch_) {
218 LOG(INFO) <<
"Overwrote from_epoch (" << from_epoch_ <<
") with"
219 <<
" truncate_epoch(" << truncate_epoch_ <<
")";
220 from_epoch_ = truncate_epoch_;
224 if (!finished_snapshots_) {
229 uint64_t too_old_pointers = 0;
230 uint64_t too_new_pointers = 0;
231 uint64_t node_filtered_pointers = 0;
232 uint64_t added_pointers = 0;
233 uint32_t page_count = 0;
237 SequentialRootPage* page;
240 reinterpret_cast<Page**>(&page)));
241 for (uint16_t i = 0; i < page->get_pointer_count(); ++i) {
242 const HeadPagePointer& pointer = page->get_pointers()[i];
243 ASSERT_ND(pointer.from_epoch_.is_valid());
246 if (pointer.from_epoch_ >= to_epoch_) {
249 }
else if (pointer.to_epoch_ <= from_epoch_) {
252 }
else if (node_filter_ >= 0 && numa_node != static_cast<uint32_t>(node_filter_)) {
253 ++node_filtered_pointers;
259 states_[node_id].snapshot_heads_.push_back(pointer);
262 next_page_id = page->get_next_page();
265 DVLOG(0) <<
"Read " << page_count <<
" root snapshot pages. added_pointers=" << added_pointers
266 <<
", too_old_pointers=" << too_old_pointers <<
", too_new_pointers=" << too_new_pointers
267 <<
", node_filtered_pointers=" << node_filtered_pointers;
268 if (added_pointers == 0) {
269 finished_snapshots_ =
true;
274 if (finished_safe_volatiles_ && finished_unsafe_volatiles_) {
280 uint64_t empty_threads = 0;
281 for (uint16_t node_id = 0; node_id < node_count_; ++node_id) {
282 if (node_filter_ >= 0 && node_id != static_cast<uint32_t>(node_filter_)) {
285 NodeState& state = states_[node_id];
286 for (uint16_t thread_ordinal = 0; thread_ordinal < thread_per_node; ++thread_ordinal) {
295 state.volatile_cur_pages_.push_back(
nullptr);
298 VolatilePagePointer pointer;
299 pointer.set(node_id, offset);
300 SequentialPage* page = resolve_volatile(pointer);
301 if (page->get_record_count() > 0 && page->get_first_record_epoch() >= to_epoch_) {
304 state.volatile_cur_pages_.push_back(
nullptr);
309 state.volatile_cur_pages_.push_back(page);
311 ASSERT_ND(state.volatile_cur_pages_.size() == thread_per_node);
313 DVLOG(0) <<
"Initialized volatile head pages. empty_threads=" << empty_threads;
316 DVLOG(0) <<
"Initialized states.";
321 ErrorCode SequentialCursor::next_batch_snapshot(
322 SequentialRecordIterator* out,
329 while (current_node_ < node_count_) {
330 NodeState& state = states_[current_node_];
331 if (state.snapshot_cur_buffer_ >= state.snapshot_buffered_pages_) {
334 if (state.snapshot_cur_buffer_ >= state.snapshot_buffered_pages_) {
341 ASSERT_ND(state.snapshot_cur_buffer_ < state.snapshot_buffered_pages_);
342 *out = SequentialRecordIterator(buffer_ + state.snapshot_cur_buffer_, from_epoch_, to_epoch_);
344 ++state.snapshot_cur_buffer_;
349 finished_snapshots_ =
true;
351 DVLOG(0) <<
"Finished reading snapshot pages: ";
356 ErrorCode SequentialCursor::buffer_snapshot_pages(uint16_t node) {
357 NodeState& state = states_[current_node_];
358 if (state.snapshot_cur_head_ == state.snapshot_heads_.size()) {
359 DVLOG(1) <<
"Node-" << node <<
" doesn't have any more snapshot pages:";
365 while (state.snapshot_buffer_begin_ + state.snapshot_cur_buffer_
366 >= state.get_cur_head().page_count_) {
367 DVLOG(1) <<
"Completed node-" << node <<
"'s head-"
368 << state.snapshot_cur_head_ <<
": ";
370 ++state.snapshot_cur_head_;
371 state.snapshot_cur_buffer_ = 0;
372 state.snapshot_buffer_begin_ = 0;
373 state.snapshot_buffered_pages_ = 0;
374 if (state.snapshot_cur_head_ == state.snapshot_heads_.size()) {
375 DVLOG(1) <<
"Completed node-" << node <<
"'s all heads: ";
381 const HeadPagePointer& head = state.get_cur_head();
382 ASSERT_ND(state.snapshot_cur_buffer_ == state.snapshot_buffered_pages_);
383 ASSERT_ND(state.snapshot_buffer_begin_ + state.snapshot_cur_buffer_ < head.page_count_);
384 uint32_t remaining = head.page_count_ - state.snapshot_buffer_begin_ - state.snapshot_cur_buffer_;
385 uint32_t to_read = std::min<uint32_t>(buffer_pages_, remaining);
387 DVLOG(1) <<
"Buffering " << to_read <<
" pages. ";
393 uint32_t new_begin = state.snapshot_buffer_begin_ + state.snapshot_cur_buffer_;
397 state.snapshot_buffer_begin_ = new_begin;
398 state.snapshot_cur_buffer_ = 0;
399 state.snapshot_buffered_pages_ = to_read;
403 for (uint32_t i = 0; i < to_read; ++i) {
404 const SequentialRecordBatch* p = buffer_ + i;
405 ASSERT_ND(p->header_.page_id_ == page_id_begin + i);
408 ASSERT_ND(p->next_page_.volatile_pointer_.is_null());
410 if (i + state.snapshot_buffer_begin_ + 1U == head.page_count_) {
411 ASSERT_ND(p->next_page_.snapshot_pointer_ == 0);
413 ASSERT_ND(p->next_page_.snapshot_pointer_ == page_id_begin + i + 1U);
420 SequentialCursor::VolatileCheckPageResult SequentialCursor::next_batch_safe_volatiles_check_page(
421 const SequentialPage* page)
const {
422 if (page ==
nullptr) {
423 DVLOG(1) <<
"Skipped empty core. node=" << current_node_ <<
", core="
424 << states_[current_node_].volatile_cur_core_ <<
".: ";
430 VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
431 if (next_pointer.is_null()) {
433 DVLOG(1) <<
"Skipped tail page. node=" << current_node_ <<
", core="
434 << states_[current_node_].volatile_cur_core_ <<
".: ";
439 if (page->get_record_count() == 0) {
442 LOG(INFO) <<
"Interesting. Empty non-tail page. node=" << current_node_ <<
", core="
443 << states_[current_node_].volatile_cur_core_ <<
". page= " << page->header();
448 Epoch epoch = page->get_first_record_epoch();
449 if (epoch >= to_epoch_) {
450 DVLOG(1) <<
"Reached to_epoch. node=" << current_node_ <<
", core="
451 << states_[current_node_].volatile_cur_core_ <<
".: ";
454 }
else if (epoch >= grace_epoch_) {
455 DVLOG(1) <<
"Reached unsafe page. node=" << current_node_ <<
", core="
456 << states_[current_node_].volatile_cur_core_ <<
".: ";
459 }
else if (latest_snapshot_epoch_.
is_valid() && epoch <= latest_snapshot_epoch_) {
460 LOG(INFO) <<
"Interesting. Records in this volatile page are already snapshotted,"
461 <<
" but this page is not dropped yet. This can happen during snapshotting."
462 <<
" node=" << current_node_ <<
", core="
463 << states_[current_node_].volatile_cur_core_ <<
". page= " << page->header();
468 if (epoch < from_epoch_) {
469 DVLOG(2) <<
"Skipping too-old epoch. node=" << current_node_ <<
", core="
470 << states_[current_node_].volatile_cur_core_ <<
".: ";
479 ErrorCode SequentialCursor::next_batch_safe_volatiles(
480 SequentialRecordIterator* out,
484 while (current_node_ < node_count_) {
485 if (node_filter_ >= 0 && current_node_ != static_cast<uint32_t>(node_filter_)) {
489 NodeState& state = states_[current_node_];
490 while (state.volatile_cur_core_ < state.volatile_cur_pages_.size()) {
491 SequentialPage* page = state.volatile_cur_pages_[state.volatile_cur_core_];
492 VolatileCheckPageResult check_result = next_batch_safe_volatiles_check_page(page);
493 if (check_result == kNextCore) {
494 ++state.volatile_cur_core_;
496 ASSERT_ND(check_result == kValidPage || check_result == kNextPage);
497 VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
498 SequentialPage* next_page = resolve_volatile(next_pointer);
499 state.volatile_cur_pages_[state.volatile_cur_core_] = next_page;
500 if (check_result == kValidPage) {
501 *out = SequentialRecordIterator(
502 reinterpret_cast<SequentialRecordBatch*>(page),
503 from_epoch_volatile_,
511 DVLOG(0) <<
"Finished reading all safe epochs in node-" << current_node_ <<
": ";
517 finished_safe_volatiles_ =
true;
518 for (uint16_t node = 0; node < node_count_; ++node) {
519 states_[node].volatile_cur_core_ = 0;
522 DVLOG(0) <<
"Finished reading safe volatile pages: ";
527 SequentialCursor::VolatileCheckPageResult SequentialCursor::next_batch_unsafe_volatiles_check_page(
528 const SequentialPage* page)
const {
529 if (page ==
nullptr) {
530 DVLOG(1) <<
"Skipped empty core. node=" << current_node_ <<
", core="
531 << states_[current_node_].volatile_cur_core_ <<
".: ";
536 if (
UNLIKELY(page->get_record_count() == 0)) {
543 Epoch epoch = page->get_first_record_epoch();
544 if (epoch >= to_epoch_) {
545 DVLOG(1) <<
"Reached to_epoch. node=" << current_node_ <<
", core="
546 << states_[current_node_].volatile_cur_core_ <<
".: ";
554 ErrorCode SequentialCursor::next_batch_unsafe_volatiles(
555 SequentialRecordIterator* out,
564 while (current_node_ < node_count_) {
565 if (node_filter_ >= 0 && current_node_ != static_cast<uint32_t>(node_filter_)) {
569 NodeState& state = states_[current_node_];
570 while (state.volatile_cur_core_ < state.volatile_cur_pages_.size()) {
571 SequentialPage* page = state.volatile_cur_pages_[state.volatile_cur_core_];
572 VolatileCheckPageResult check_result = next_batch_unsafe_volatiles_check_page(page);
573 if (check_result == kNextCore) {
574 ++state.volatile_cur_core_;
580 VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
581 bool tail_page = next_pointer.is_null();
582 uint16_t record_count = page->get_record_count();
586 PageVersionStatus observed = page->header().page_version_.status_;
588 if ((tail_page && !page->next_page().volatile_pointer_.is_null())
589 || page->get_record_count() != record_count) {
590 LOG(INFO) <<
"Wow, super rare. just installed next page or added a new record!";
599 &page->header().page_version_,
606 if (serializable && record_count > 0) {
607 Epoch epoch = page->get_first_record_epoch();
608 if (epoch > grace_epoch_) {
609 uint16_t offset = page->get_record_offset(record_count - 1);
610 xct::RwLockableXctId* owner_id = page->owner_id_from_offset(offset);
615 if (next_pointer.is_null()) {
616 state.volatile_cur_pages_[state.volatile_cur_core_] =
nullptr;
618 SequentialPage* next_page = resolve_volatile(next_pointer);
619 state.volatile_cur_pages_[state.volatile_cur_core_] = next_page;
622 *out = SequentialRecordIterator(
623 reinterpret_cast<SequentialRecordBatch*>(page),
624 from_epoch_volatile_,
631 DVLOG(0) <<
"Finished reading all unsafe epochs in node-" << current_node_ <<
": ";
637 finished_unsafe_volatiles_ =
true;
642 SequentialPage* SequentialCursor::resolve_volatile(VolatilePagePointer pointer)
const {
643 return reinterpret_cast<SequentialPage*
>(resolver_.
resolve_offset(pointer));
646 void SequentialCursor::refresh_grace_epoch() {
648 ASSERT_ND(new_grace_epoch >= grace_epoch_);
649 grace_epoch_ = new_grace_epoch;
653 o <<
"<SequentialCursor>" << std::endl;
655 o <<
" <from_epoch>" << v.
get_from_epoch() <<
"</from_epoch>" << std::endl;
656 o <<
" <to_epoch>" << v.
get_to_epoch() <<
"</to_epoch>" << std::endl;
657 o <<
" <order_mode>" << v.order_mode_ <<
"</order_mode>" << std::endl;
658 o <<
" <node_filter>" << v.node_filter_ <<
"</node_filter>" << std::endl;
659 o <<
" <snapshot_only_>" << v.snapshot_only_ <<
"</snapshot_only_>" << std::endl;
660 o <<
" <safe_epoch_only_>" << v.safe_epoch_only_ <<
"</safe_epoch_only_>" << std::endl;
661 o <<
" <buffer_>" << v.buffer_ <<
"</buffer_>" << std::endl;
662 o <<
" <buffer_size>" << v.buffer_size_ <<
"</buffer_size>" << std::endl;
663 o <<
" <buffer_pages_>" << v.buffer_pages_ <<
"</buffer_pages_>" << std::endl;
664 o <<
" <current_node_>" << v.current_node_ <<
"</current_node_>" << std::endl;
665 o <<
" <finished_snapshots_>" << v.finished_snapshots_ <<
"</finished_snapshots_>" << std::endl;
666 o <<
" <finished_safe_volatiles_>" << v.finished_safe_volatiles_
667 <<
"</finished_safe_volatiles_>" << std::endl;
668 o <<
" <finished_unsafe_volatiles_>" << v.finished_unsafe_volatiles_
669 <<
"</finished_unsafe_volatiles_>" << std::endl;
670 o <<
"</SequentialCursor>";
A cursor interface to read tuples from a sequential storage.
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
Epoch get_from_epoch() const
OrderMode
The order this cursor returns tuples.
std::ostream & operator<<(std::ostream &o, const SequentialCursor &v)
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Represents one thread running on one NUMA core.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
ErrorCode on_record_read(bool intended_for_write, RwLockableXctId *tid_address, XctId *observed_xid, ReadXctAccess **read_set_address, bool no_readset_if_moved=false, bool no_readset_if_next_layer=false)
The general logic invoked for every record read.
Epoch get_current_grace_epoch() const
Returns the current grace-period epoch (global epoch - 1), the epoch some transaction might be still ...
Returns as many records as possible from node-0's core-0, core-1, do the same from node-1...
void next() __attribute__((always_inline))
Snapshot isolation (SI), meaning the transaction reads a consistent and complete image of the databas...
const Metadata * get_metadata() const
Returns the metadata of this storage.
const EngineOptions & get_options() const
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
Epoch get_epoch_from_offset(uint16_t offset) const
bool in_epoch_range(Epoch epoch) const __attribute__((always_inline))
Represents an append/scan-only store.
Epoch max_from_epoch_snapshot_epoch(Epoch from_epoch, Epoch latest_snapshot_epoch)
ErrorCode next_batch(SequentialRecordIterator *out)
Returns a batch of records as an iterator.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
const sequential::SequentialStorage & get_storage() const
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Epoch get_to_epoch() const
ErrorCode optimistic_read_truncate_epoch(thread::Thread *context, Epoch *out) const
Obtains the current value of truncate-epoch in an OCC-fashion.
SequentialRecordIterator()
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
thread::ThreadOptions thread_
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
const Epoch INVALID_EPOCH
A constant epoch object that represents an invalid epoch.
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
ErrorCode add_to_page_version_set(const storage::PageVersion *version_address, storage::PageVersionStatus observed)
Add the given page version to the page version set of this transaction.
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
Iterator for one SequentialRecordBatch, or a page.
SequentialCursor(thread::Thread *context, const sequential::SequentialStorage &storage, void *buffer, uint64_t buffer_size, OrderMode order_mode=kNodeFirstMode, Epoch from_epoch=INVALID_EPOCH, Epoch to_epoch=INVALID_EPOCH, int32_t node_filter=-1)
Constructs a cursor to read tuples from this storage.
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint16_t get_record_length(uint16_t index) const
A chunk of records returned by SequentialCursor.
CONTROL_BLOCK * get_control_block() const
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
ErrorCode
Enum of error codes defined in error_code.xmacro.
ErrorCode read_snapshot_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, storage::Page *buffer)
Read contiguous pages in one shot.
Protects against all anomalies in all situations.