libfoedus-core
FOEDUS Core Library
sequential_cursor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <ostream>
24 
25 #include "foedus/assert_nd.hpp"
26 #include "foedus/engine.hpp"
31 #include "foedus/thread/thread.hpp"
32 #include "foedus/xct/xct.hpp"
34 
35 namespace foedus {
36 namespace storage {
37 namespace sequential {
38 
39 Epoch max_from_epoch_snapshot_epoch(Epoch from_epoch, Epoch latest_snapshot_epoch) {
40  if (!latest_snapshot_epoch.is_valid()) {
41  return from_epoch;
42  }
43  if (from_epoch > latest_snapshot_epoch) {
44  return from_epoch;
45  } else {
46  return latest_snapshot_epoch;
47  }
48 }
49 
51  thread::Thread* context,
52  const SequentialStorage& storage,
53  void* buffer,
54  uint64_t buffer_size,
55  OrderMode order_mode,
56  Epoch from_epoch,
57  Epoch to_epoch,
58  int32_t node_filter)
59  : context_(context),
60  xct_(&context->get_current_xct()),
61  engine_(context->get_engine()),
62  resolver_(engine_->get_memory_manager()->get_global_volatile_page_resolver()),
63  storage_(storage),
64  from_epoch_(
65  from_epoch.is_valid() ? from_epoch : engine_->get_savepoint_manager()->get_earliest_epoch()),
66  to_epoch_(
67  to_epoch.is_valid() ? to_epoch : engine_->get_xct_manager()->get_current_grace_epoch()),
68  latest_snapshot_epoch_(engine_->get_snapshot_manager()->get_snapshot_epoch()),
69  from_epoch_volatile_(max_from_epoch_snapshot_epoch(from_epoch_, latest_snapshot_epoch_)),
70  node_filter_(node_filter),
71  node_count_(engine_->get_soc_count()),
72  order_mode_(order_mode),
73  buffer_(reinterpret_cast<SequentialRecordBatch*>(buffer)),
74  buffer_size_(buffer_size),
75  buffer_pages_(buffer_size / kPageSize) {
76  ASSERT_ND(buffer_size >= kPageSize);
77  current_node_ = 0;
78  finished_snapshots_ = false;
79  finished_safe_volatiles_ = false;
80  finished_unsafe_volatiles_ = false;
81  states_.clear();
82 
83  grace_epoch_ = engine_->get_xct_manager()->get_current_grace_epoch();
84  ASSERT_ND(from_epoch_.is_valid());
85  ASSERT_ND(to_epoch_.is_valid());
86  ASSERT_ND(from_epoch_ <= to_epoch_);
87 
88  if (xct_->get_isolation_level() == xct::kSnapshot
89  || (latest_snapshot_epoch_.is_valid() && to_epoch_ <= latest_snapshot_epoch_)) {
90  snapshot_only_ = true;
91  safe_epoch_only_ = true;
92  finished_safe_volatiles_ = true;
93  finished_unsafe_volatiles_ = true;
94  } else {
95  snapshot_only_ = false;
96  if (to_epoch_ <= grace_epoch_) {
97  safe_epoch_only_ = true;
98  // We do NOT rule out reading unsafe pages yet.
99  // Even a safe page might be conservatively deemed as unsafe in our logic,
100  // so we must make sure we go on to unsafe-volatile phase too.
101  // Real-check happens in next_batch_unsafe_volatiles().
102  // finished_unsafe_volatiles_ = true;
103  } else {
104  // only in this case, we have to take a lock
105  safe_epoch_only_ = false;
106  }
107  }
108 
109  if (!latest_snapshot_epoch_.is_valid() || latest_snapshot_epoch_ < from_epoch_) {
110  finished_snapshots_ = true;
111  }
112 }
113 
115  states_.clear();
116 }
117 
118 SequentialCursor::NodeState::NodeState(uint16_t node_id) : node_id_(node_id) {
119  volatile_cur_core_ = 0;
120  snapshot_cur_head_ = 0;
121  snapshot_cur_buffer_ = 0;
122  snapshot_buffered_pages_ = 0;
123  snapshot_buffer_begin_ = 0;
124 }
125 SequentialCursor::NodeState::~NodeState() {}
126 
128  : batch_(nullptr),
129  from_epoch_(INVALID_EPOCH),
130  to_epoch_(INVALID_EPOCH),
131  record_count_(0) {
132  cur_record_ = 0;
133  cur_record_length_ = 0;
134  cur_offset_ = 0;
135  cur_record_epoch_ = INVALID_EPOCH;
136  stat_skipped_records_ = 0;
137 }
138 
140  const SequentialRecordBatch* batch,
141  Epoch from_epoch,
142  Epoch to_epoch)
143  : batch_(batch),
144  from_epoch_(from_epoch),
145  to_epoch_(to_epoch),
146  record_count_(batch->get_record_count()) {
147  cur_record_ = 0;
148  cur_record_length_ = batch_->get_record_length(0);
149  cur_offset_ = 0;
150  cur_record_epoch_ = batch->get_epoch_from_offset(0);
151  ASSERT_ND(cur_record_epoch_.is_valid());
152  stat_skipped_records_ = 0;
153  if (!in_epoch_range(cur_record_epoch_)) {
154  ++stat_skipped_records_;
155  next();
156  }
157 }
158 
160  out->reset();
161  if (states_.empty()) {
162  CHECK_ERROR_CODE(init_states());
163  }
164 
165  bool found = false;
166  if (!finished_snapshots_) {
167  CHECK_ERROR_CODE(next_batch_snapshot(out, &found));
168  if (found) {
169  return kErrorCodeOk;
170  } else {
171  DVLOG(1) << "Finished reading snapshot pages:";
172  DVLOG(2) << *this;
173  ASSERT_ND(finished_snapshots_);
174  refresh_grace_epoch();
175  }
176  }
177 
178  if (!finished_safe_volatiles_) {
179  CHECK_ERROR_CODE(next_batch_safe_volatiles(out, &found));
180  if (found) {
181  return kErrorCodeOk;
182  } else {
183  DVLOG(1) << "Finished reading safe volatile pages:";
184  DVLOG(2) << *this;
185  ASSERT_ND(finished_safe_volatiles_);
186  refresh_grace_epoch();
187  }
188  }
189 
190  if (!finished_unsafe_volatiles_) {
191  CHECK_ERROR_CODE(next_batch_unsafe_volatiles(out, &found));
192  if (found) {
193  return kErrorCodeOk;
194  } else {
195  DVLOG(1) << "Finished reading unsafe volatile pages:";
196  DVLOG(2) << *this;
197  ASSERT_ND(finished_unsafe_volatiles_);
198  }
199  }
200 
201  ASSERT_ND(!is_valid());
202  return kErrorCodeOk;
203 }
204 
205 ErrorCode SequentialCursor::init_states() {
206  DVLOG(0) << "Initializing states...";
207  DVLOG(1) << *this;
208  for (uint16_t node_id = 0; node_id < node_count_; ++node_id) {
209  states_.emplace_back(node_id);
210  }
211 
212  // Ignore records that are before the truncate epoch.
213  // This is trivially done by overwriting from_epoch with truncate epoch.
214  CHECK_ERROR_CODE(storage_.optimistic_read_truncate_epoch(context_, &truncate_epoch_));
215  ASSERT_ND(truncate_epoch_.is_valid());
216  DVLOG(0) << "truncate_epoch_=" << truncate_epoch_;
217  if (truncate_epoch_ > from_epoch_) {
218  LOG(INFO) << "Overwrote from_epoch (" << from_epoch_ << ") with"
219  << " truncate_epoch(" << truncate_epoch_ << ")";
220  from_epoch_ = truncate_epoch_;
221  }
222 
223  // initialize snapshot page status
224  if (!finished_snapshots_) {
225  ASSERT_ND(latest_snapshot_epoch_.is_valid());
226  SnapshotPagePointer root_snapshot_page_id = storage_.get_metadata()->root_snapshot_page_id_;
227 
228  // read all entries from all root pages
229  uint64_t too_old_pointers = 0;
230  uint64_t too_new_pointers = 0;
231  uint64_t node_filtered_pointers = 0;
232  uint64_t added_pointers = 0;
233  uint32_t page_count = 0;
234  for (SnapshotPagePointer next_page_id = root_snapshot_page_id; next_page_id != 0;) {
235  ASSERT_ND(next_page_id != 0);
236  ++page_count;
237  SequentialRootPage* page;
239  next_page_id,
240  reinterpret_cast<Page**>(&page)));
241  for (uint16_t i = 0; i < page->get_pointer_count(); ++i) {
242  const HeadPagePointer& pointer = page->get_pointers()[i];
243  ASSERT_ND(pointer.from_epoch_.is_valid());
244  ASSERT_ND(pointer.to_epoch_.is_valid());
245  uint16_t numa_node = extract_numa_node_from_snapshot_pointer(pointer.page_id_);
246  if (pointer.from_epoch_ >= to_epoch_) {
247  ++too_new_pointers;
248  continue;
249  } else if (pointer.to_epoch_ <= from_epoch_) {
250  ++too_old_pointers;
251  continue;
252  } else if (node_filter_ >= 0 && numa_node != static_cast<uint32_t>(node_filter_)) {
253  ++node_filtered_pointers;
254  continue;
255  } else {
256  ++added_pointers;
257  uint16_t node_id = extract_numa_node_from_snapshot_pointer(pointer.page_id_);
258  ASSERT_ND(node_id < node_count_);
259  states_[node_id].snapshot_heads_.push_back(pointer);
260  }
261  }
262  next_page_id = page->get_next_page();
263  }
264 
265  DVLOG(0) << "Read " << page_count << " root snapshot pages. added_pointers=" << added_pointers
266  << ", too_old_pointers=" << too_old_pointers << ", too_new_pointers=" << too_new_pointers
267  << ", node_filtered_pointers=" << node_filtered_pointers;
268  if (added_pointers == 0) {
269  finished_snapshots_ = true;
270  }
271  }
272 
273  // initialize volatile page status
274  if (finished_safe_volatiles_ && finished_unsafe_volatiles_) {
275  ASSERT_ND(safe_epoch_only_);
276  } else {
277  SequentialStoragePimpl pimpl(engine_, storage_.get_control_block());
278  uint16_t thread_per_node = engine_->get_options().thread_.thread_count_per_group_;
279 
280  uint64_t empty_threads = 0;
281  for (uint16_t node_id = 0; node_id < node_count_; ++node_id) {
282  if (node_filter_ >= 0 && node_id != static_cast<uint32_t>(node_filter_)) {
283  continue;
284  }
285  NodeState& state = states_[node_id];
286  for (uint16_t thread_ordinal = 0; thread_ordinal < thread_per_node; ++thread_ordinal) {
287  thread::ThreadId thread_id = thread::compose_thread_id(node_id, thread_ordinal);
288  memory::PagePoolOffset offset = *pimpl.get_head_pointer(thread_id);
289  if (offset == 0) {
290  // TASK(Hideaki) This should install the head pointer to not overlook concurrent
291  // insertions. Currently we will miss records in such a case. We need
292  // a lock for head-installation. Overhead is not an issue because this rarely happens,
293  // but we need to implement. later later.
294  ++empty_threads;
295  state.volatile_cur_pages_.push_back(nullptr);
296  continue;
297  }
298  VolatilePagePointer pointer;
299  pointer.set(node_id, offset);
300  SequentialPage* page = resolve_volatile(pointer);
301  if (page->get_record_count() > 0 && page->get_first_record_epoch() >= to_epoch_) {
302  // even the first record has too-new epoch, no chance. safe to ignore this thread.
303  ++empty_threads;
304  state.volatile_cur_pages_.push_back(nullptr);
305  continue;
306  }
307 
308  // from_epoch_ doesn't matter. the thread might be inserting a new record right now.
309  state.volatile_cur_pages_.push_back(page);
310  }
311  ASSERT_ND(state.volatile_cur_pages_.size() == thread_per_node);
312  }
313  DVLOG(0) << "Initialized volatile head pages. empty_threads=" << empty_threads;
314  }
315 
316  DVLOG(0) << "Initialized states.";
317  DVLOG(1) << *this;
318  return kErrorCodeOk;
319 }
320 
321 ErrorCode SequentialCursor::next_batch_snapshot(
322  SequentialRecordIterator* out,
323  bool* found) {
324  ASSERT_ND(!finished_snapshots_);
325  ASSERT_ND(order_mode_ == kNodeFirstMode); // TASK(Hideaki) implement other modes
326  // When we implement epoch_first mode, remember that we have to split the buffer to nodes.
327  // In the worst case we have to read one-page at a time...
328  // The code below assumed node-first mode, so we can fully use the buffer for each node.
329  while (current_node_ < node_count_) {
330  NodeState& state = states_[current_node_];
331  if (state.snapshot_cur_buffer_ >= state.snapshot_buffered_pages_) {
332  // need to buffer more
333  CHECK_ERROR_CODE(buffer_snapshot_pages(current_node_));
334  if (state.snapshot_cur_buffer_ >= state.snapshot_buffered_pages_) {
335  ++current_node_;
336  continue;
337  }
338  }
339 
340  // okay, we have a page to return
341  ASSERT_ND(state.snapshot_cur_buffer_ < state.snapshot_buffered_pages_);
342  *out = SequentialRecordIterator(buffer_ + state.snapshot_cur_buffer_, from_epoch_, to_epoch_);
343  *found = true;
344  ++state.snapshot_cur_buffer_;
345  return kErrorCodeOk;
346  }
347 
348  ASSERT_ND(*found == false);
349  finished_snapshots_ = true;
350  current_node_ = 0;
351  DVLOG(0) << "Finished reading snapshot pages: ";
352  DVLOG(1) << *this;
353  return kErrorCodeOk;
354 }
355 
356 ErrorCode SequentialCursor::buffer_snapshot_pages(uint16_t node) {
357  NodeState& state = states_[current_node_];
358  if (state.snapshot_cur_head_ == state.snapshot_heads_.size()) {
359  DVLOG(1) << "Node-" << node << " doesn't have any more snapshot pages:";
360  DVLOG(2) << *this;
361  return kErrorCodeOk;
362  }
363 
364  // do we have to switch to next linked-list?
365  while (state.snapshot_buffer_begin_ + state.snapshot_cur_buffer_
366  >= state.get_cur_head().page_count_) {
367  DVLOG(1) << "Completed node-" << node << "'s head-"
368  << state.snapshot_cur_head_ << ": ";
369  DVLOG(2) << *this;
370  ++state.snapshot_cur_head_;
371  state.snapshot_cur_buffer_ = 0;
372  state.snapshot_buffer_begin_ = 0;
373  state.snapshot_buffered_pages_ = 0;
374  if (state.snapshot_cur_head_ == state.snapshot_heads_.size()) {
375  DVLOG(1) << "Completed node-" << node << "'s all heads: ";
376  DVLOG(2) << *this;
377  return kErrorCodeOk;
378  }
379  }
380 
381  const HeadPagePointer& head = state.get_cur_head();
382  ASSERT_ND(state.snapshot_cur_buffer_ == state.snapshot_buffered_pages_);
383  ASSERT_ND(state.snapshot_buffer_begin_ + state.snapshot_cur_buffer_ < head.page_count_);
384  uint32_t remaining = head.page_count_ - state.snapshot_buffer_begin_ - state.snapshot_cur_buffer_;
385  uint32_t to_read = std::min<uint32_t>(buffer_pages_, remaining);
386  ASSERT_ND(to_read > 0);
387  DVLOG(1) << "Buffering " << to_read << " pages. ";
388  DVLOG(2) << *this;
389 
390  // here, we read contiguous to_read pages in one shot.
391  // this means that we always bypass snapshot-cache, but shouldn't be
392  // an issue considering that we are probably reading millions of pages.
393  uint32_t new_begin = state.snapshot_buffer_begin_ + state.snapshot_cur_buffer_;
394  SnapshotPagePointer page_id_begin = head.page_id_ + new_begin;
396  context_->read_snapshot_pages(page_id_begin, to_read, reinterpret_cast<Page*>(buffer_)));
397  state.snapshot_buffer_begin_ = new_begin;
398  state.snapshot_cur_buffer_ = 0;
399  state.snapshot_buffered_pages_ = to_read;
400 
401 #ifndef NDEBUG
402  // sanity checks
403  for (uint32_t i = 0; i < to_read; ++i) {
404  const SequentialRecordBatch* p = buffer_ + i;
405  ASSERT_ND(p->header_.page_id_ == page_id_begin + i);
406  ASSERT_ND(p->header_.snapshot_);
407  ASSERT_ND(p->header_.get_page_type() == kSequentialPageType);
408  ASSERT_ND(p->next_page_.volatile_pointer_.is_null());
409  // Q: "Why +1?". A: For ex., think about the case where page_count_ == 1.
410  if (i + state.snapshot_buffer_begin_ + 1U == head.page_count_) {
411  ASSERT_ND(p->next_page_.snapshot_pointer_ == 0);
412  } else {
413  ASSERT_ND(p->next_page_.snapshot_pointer_ == page_id_begin + i + 1U);
414  }
415  }
416 #endif // NDEBUG
417  return kErrorCodeOk;
418 }
419 
420 SequentialCursor::VolatileCheckPageResult SequentialCursor::next_batch_safe_volatiles_check_page(
421  const SequentialPage* page) const {
422  if (page == nullptr) {
423  DVLOG(1) << "Skipped empty core. node=" << current_node_ << ", core="
424  << states_[current_node_].volatile_cur_core_ << ".: ";
425  DVLOG(2) << *this;
426  return kNextCore;
427  }
428  // Even if we have a volatile page, is it safe to read from?
429  // If not, we skip reading here and will resume in the unsafe part.
430  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
431  if (next_pointer.is_null()) {
432  // Tail page is always unsafe. We don't know when next-pointer will be installed.
433  DVLOG(1) << "Skipped tail page. node=" << current_node_ << ", core="
434  << states_[current_node_].volatile_cur_core_ << ".: ";
435  DVLOG(2) << *this;
436  return kNextCore;
437  }
438 
439  if (page->get_record_count() == 0) {
440  // if it's not the tail, even an empty page (which should be rare) is safe.
441  // we just move on to next page
442  LOG(INFO) << "Interesting. Empty non-tail page. node=" << current_node_ << ", core="
443  << states_[current_node_].volatile_cur_core_ << ". page= " << page->header();
444  return kNextPage;
445  }
446 
447  // All records in this page have this epoch.
448  Epoch epoch = page->get_first_record_epoch();
449  if (epoch >= to_epoch_) {
450  DVLOG(1) << "Reached to_epoch. node=" << current_node_ << ", core="
451  << states_[current_node_].volatile_cur_core_ << ".: ";
452  DVLOG(2) << *this;
453  return kNextCore;
454  } else if (epoch >= grace_epoch_) {
455  DVLOG(1) << "Reached unsafe page. node=" << current_node_ << ", core="
456  << states_[current_node_].volatile_cur_core_ << ".: ";
457  DVLOG(2) << *this;
458  return kNextCore;
459  } else if (latest_snapshot_epoch_.is_valid() && epoch <= latest_snapshot_epoch_) {
460  LOG(INFO) << "Interesting. Records in this volatile page are already snapshotted,"
461  << " but this page is not dropped yet. This can happen during snapshotting."
462  << " node=" << current_node_ << ", core="
463  << states_[current_node_].volatile_cur_core_ << ". page= " << page->header();
464  return kNextPage;
465  }
466 
467  // okay, this page is safe to read!
468  if (epoch < from_epoch_) {
469  DVLOG(2) << "Skipping too-old epoch. node=" << current_node_ << ", core="
470  << states_[current_node_].volatile_cur_core_ << ".: ";
471  DVLOG(3) << *this;
472  return kNextPage;
473  }
474 
475  // okay, safe and not too old.
476  return kValidPage;
477 }
478 
479 ErrorCode SequentialCursor::next_batch_safe_volatiles(
480  SequentialRecordIterator* out,
481  bool* found) {
482  ASSERT_ND(!finished_safe_volatiles_);
483  ASSERT_ND(order_mode_ == kNodeFirstMode); // TASK(Hideaki) implement other modes
484  while (current_node_ < node_count_) {
485  if (node_filter_ >= 0 && current_node_ != static_cast<uint32_t>(node_filter_)) {
486  ++current_node_;
487  continue;
488  }
489  NodeState& state = states_[current_node_];
490  while (state.volatile_cur_core_ < state.volatile_cur_pages_.size()) {
491  SequentialPage* page = state.volatile_cur_pages_[state.volatile_cur_core_];
492  VolatileCheckPageResult check_result = next_batch_safe_volatiles_check_page(page);
493  if (check_result == kNextCore) {
494  ++state.volatile_cur_core_;
495  } else {
496  ASSERT_ND(check_result == kValidPage || check_result == kNextPage);
497  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
498  SequentialPage* next_page = resolve_volatile(next_pointer);
499  state.volatile_cur_pages_[state.volatile_cur_core_] = next_page;
500  if (check_result == kValidPage) {
501  *out = SequentialRecordIterator(
502  reinterpret_cast<SequentialRecordBatch*>(page),
503  from_epoch_volatile_,
504  to_epoch_);
505  *found = true;
506  return kErrorCodeOk;
507  }
508  }
509  }
510 
511  DVLOG(0) << "Finished reading all safe epochs in node-" << current_node_ << ": ";
512  DVLOG(1) << *this;
513  ++current_node_;
514  }
515 
516  ASSERT_ND(*found == false);
517  finished_safe_volatiles_ = true;
518  for (uint16_t node = 0; node < node_count_; ++node) {
519  states_[node].volatile_cur_core_ = 0;
520  }
521  current_node_ = 0;
522  DVLOG(0) << "Finished reading safe volatile pages: ";
523  DVLOG(1) << *this;
524  return kErrorCodeOk;
525 }
526 
527 SequentialCursor::VolatileCheckPageResult SequentialCursor::next_batch_unsafe_volatiles_check_page(
528  const SequentialPage* page) const {
529  if (page == nullptr) {
530  DVLOG(1) << "Skipped empty core. node=" << current_node_ << ", core="
531  << states_[current_node_].volatile_cur_core_ << ".: ";
532  DVLOG(2) << *this;
533  return kNextCore;
534  }
535 
536  if (UNLIKELY(page->get_record_count() == 0)) {
537  // This is most likely a tail page. Just make sure it's safe
538  // by taking page-version set if that is the case.
539  return kValidPage;
540  }
541 
542  ASSERT_ND(page->get_record_count() > 0);
543  Epoch epoch = page->get_first_record_epoch();
544  if (epoch >= to_epoch_) {
545  DVLOG(1) << "Reached to_epoch. node=" << current_node_ << ", core="
546  << states_[current_node_].volatile_cur_core_ << ".: ";
547  DVLOG(2) << *this;
548  return kNextCore;
549  }
550 
551  return kValidPage;
552 }
553 
554 ErrorCode SequentialCursor::next_batch_unsafe_volatiles(
555  SequentialRecordIterator* out,
556  bool* found) {
557  ASSERT_ND(!finished_unsafe_volatiles_);
558  // mode doesn't matter when we are reading unsafe epochs. we just read them all one by one.
559 
560  // if the record is in current global epoch, we have to take it as read-set for serializability.
561  // records in grace epoch are fine. This transaction will be surely in the current global epoch
562  // or later, so the dependency is trivially met.
563  bool serializable = xct_->get_isolation_level() == xct::kSerializable;
564  while (current_node_ < node_count_) {
565  if (node_filter_ >= 0 && current_node_ != static_cast<uint32_t>(node_filter_)) {
566  ++current_node_;
567  continue;
568  }
569  NodeState& state = states_[current_node_];
570  while (state.volatile_cur_core_ < state.volatile_cur_pages_.size()) {
571  SequentialPage* page = state.volatile_cur_pages_[state.volatile_cur_core_];
572  VolatileCheckPageResult check_result = next_batch_unsafe_volatiles_check_page(page);
573  if (check_result == kNextCore) {
574  ++state.volatile_cur_core_;
575  } else {
576  ASSERT_ND(check_result == kValidPage);
577  // In unsafe page, we need to be careful to tell kValidPage/kNextPage apart.
578  // So, do it here.
579 
580  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
581  bool tail_page = next_pointer.is_null();
582  uint16_t record_count = page->get_record_count();
583  if (serializable) {
584  // let's protect the above two information with page version
586  PageVersionStatus observed = page->header().page_version_.status_;
588  if ((tail_page && !page->next_page().volatile_pointer_.is_null())
589  || page->get_record_count() != record_count) {
590  LOG(INFO) << "Wow, super rare. just installed next page or added a new record!";
591  continue; // retry. concurrent thread has now installed it!
592  }
593 
594  // If not tail page, this page is already safe.
595  // Otherwise, we have to protect the fact that this was a tail page by
596  // taking a page-version set if the transaction is serializable.
597  if (tail_page) {
599  &page->header().page_version_,
600  observed));
601  }
602  }
603 
604  // because of the way each thread appends, the last record in this page
605  // always has the largest in-epoch ordinal. so, we just need it for read-set
606  if (serializable && record_count > 0) {
607  Epoch epoch = page->get_first_record_epoch();
608  if (epoch > grace_epoch_) {
609  uint16_t offset = page->get_record_offset(record_count - 1);
610  xct::RwLockableXctId* owner_id = page->owner_id_from_offset(offset);
611  CHECK_ERROR_CODE(xct_->on_record_read(false, owner_id));
612  }
613  }
614 
615  if (next_pointer.is_null()) {
616  state.volatile_cur_pages_[state.volatile_cur_core_] = nullptr;
617  } else {
618  SequentialPage* next_page = resolve_volatile(next_pointer);
619  state.volatile_cur_pages_[state.volatile_cur_core_] = next_page;
620  }
621 
622  *out = SequentialRecordIterator(
623  reinterpret_cast<SequentialRecordBatch*>(page),
624  from_epoch_volatile_,
625  to_epoch_);
626  *found = true;
627  return kErrorCodeOk;
628  }
629  }
630 
631  DVLOG(0) << "Finished reading all unsafe epochs in node-" << current_node_ << ": ";
632  DVLOG(1) << *this;
633  ++current_node_;
634  }
635 
636  ASSERT_ND(*found == false);
637  finished_unsafe_volatiles_ = true;
638 
639  return kErrorCodeOk;
640 }
641 
642 SequentialPage* SequentialCursor::resolve_volatile(VolatilePagePointer pointer) const {
643  return reinterpret_cast<SequentialPage*>(resolver_.resolve_offset(pointer));
644 }
645 
646 void SequentialCursor::refresh_grace_epoch() {
647  Epoch new_grace_epoch = engine_->get_xct_manager()->get_current_grace_epoch();
648  ASSERT_ND(new_grace_epoch >= grace_epoch_);
649  grace_epoch_ = new_grace_epoch;
650 }
651 
652 std::ostream& operator<<(std::ostream& o, const SequentialCursor& v) {
653  o << "<SequentialCursor>" << std::endl;
654  o << " " << v.get_storage() << std::endl;
655  o << " <from_epoch>" << v.get_from_epoch() << "</from_epoch>" << std::endl;
656  o << " <to_epoch>" << v.get_to_epoch() << "</to_epoch>" << std::endl;
657  o << " <order_mode>" << v.order_mode_ << "</order_mode>" << std::endl;
658  o << " <node_filter>" << v.node_filter_ << "</node_filter>" << std::endl;
659  o << " <snapshot_only_>" << v.snapshot_only_ << "</snapshot_only_>" << std::endl;
660  o << " <safe_epoch_only_>" << v.safe_epoch_only_ << "</safe_epoch_only_>" << std::endl;
661  o << " <buffer_>" << v.buffer_ << "</buffer_>" << std::endl;
662  o << " <buffer_size>" << v.buffer_size_ << "</buffer_size>" << std::endl;
663  o << " <buffer_pages_>" << v.buffer_pages_ << "</buffer_pages_>" << std::endl;
664  o << " <current_node_>" << v.current_node_ << "</current_node_>" << std::endl;
665  o << " <finished_snapshots_>" << v.finished_snapshots_ << "</finished_snapshots_>" << std::endl;
666  o << " <finished_safe_volatiles_>" << v.finished_safe_volatiles_
667  << "</finished_safe_volatiles_>" << std::endl;
668  o << " <finished_unsafe_volatiles_>" << v.finished_unsafe_volatiles_
669  << "</finished_unsafe_volatiles_>" << std::endl;
670  o << "</SequentialCursor>";
671  return o;
672 }
673 
674 
675 } // namespace sequential
676 } // namespace storage
677 } // namespace foedus
A cursor interface to read tuples from a sequential storage.
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
Definition: thread.cpp:95
OrderMode
The order this cursor returns tuples.
std::ostream & operator<<(std::ostream &o, const SequentialCursor &v)
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
ErrorCode on_record_read(bool intended_for_write, RwLockableXctId *tid_address, XctId *observed_xid, ReadXctAccess **read_set_address, bool no_readset_if_moved=false, bool no_readset_if_next_layer=false)
The general logic invoked for every record read.
Definition: xct.cpp:258
Epoch get_current_grace_epoch() const
Returns the current grace-period epoch (global epoch - 1), the epoch some transaction might be still ...
Returns as many records as possible from node-0's core-0, core-1, do the same from node-1...
Represents a time epoch.
Definition: epoch.hpp:61
Snapshot isolation (SI), meaning the transaction reads a consistent and complete image of the databas...
Definition: xct_id.hpp:78
const Metadata * get_metadata() const
Returns the metadata of this storage.
Definition: storage.hpp:162
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
bool in_epoch_range(Epoch epoch) const __attribute__((always_inline))
Represents an append/scan-only store.
Epoch max_from_epoch_snapshot_epoch(Epoch from_epoch, Epoch latest_snapshot_epoch)
ErrorCode next_batch(SequentialRecordIterator *out)
Returns a batch of records as an iterator.
0 means no-error.
Definition: error_code.hpp:87
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
const sequential::SequentialStorage & get_storage() const
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
bool is_valid() const
Definition: epoch.hpp:96
ErrorCode optimistic_read_truncate_epoch(thread::Thread *context, Epoch *out) const
Obtains the current value of truncate-epoch in an OCC-fashion.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
thread::ThreadOptions thread_
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
Definition: xct.hpp:149
const Epoch INVALID_EPOCH
A constant epoch object that represents an invalid epoch.
Definition: epoch.hpp:204
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
ErrorCode add_to_page_version_set(const storage::PageVersion *version_address, storage::PageVersionStatus observed)
Add the given page version to the page version set of this transaction.
Definition: xct.cpp:242
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
Iterator for one SequentialRecordBatch, or a page.
SequentialCursor(thread::Thread *context, const sequential::SequentialStorage &storage, void *buffer, uint64_t buffer_size, OrderMode order_mode=kNodeFirstMode, Epoch from_epoch=INVALID_EPOCH, Epoch to_epoch=INVALID_EPOCH, int32_t node_filter=-1)
Constructs a cursor to read tuples from this storage.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A chunk of records returned by SequentialCursor.
CONTROL_BLOCK * get_control_block() const
Definition: attachable.hpp:97
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
ErrorCode read_snapshot_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, storage::Page *buffer)
Read contiguous pages in one shot.
Definition: thread.cpp:89
Protects against all anomalies in all situations.
Definition: xct_id.hpp:86
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112