libfoedus-core
FOEDUS Core Library
sequential_cursor.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_STORAGE_SEQUENTIAL_SEQUENTIAL_CURSOR_HPP_
19 #define FOEDUS_STORAGE_SEQUENTIAL_SEQUENTIAL_CURSOR_HPP_
20 
21 #include <stdint.h>
22 
23 #include <algorithm>
24 #include <cstring>
25 #include <iosfwd>
26 #include <vector>
27 
28 #include "foedus/assert_nd.hpp"
29 #include "foedus/cxx11.hpp"
30 #include "foedus/epoch.hpp"
31 #include "foedus/memory/fwd.hpp"
32 #include "foedus/storage/page.hpp"
37 #include "foedus/thread/fwd.hpp"
38 #include "foedus/xct/fwd.hpp"
39 #include "foedus/xct/xct_id.hpp"
40 
41 namespace foedus {
42 namespace storage {
43 namespace sequential {
92  public:
94  enum OrderMode {
107  };
108 
131  thread::Thread* context,
132  const sequential::SequentialStorage& storage,
133  void* buffer,
134  uint64_t buffer_size,
135  OrderMode order_mode = kNodeFirstMode,
136  Epoch from_epoch = INVALID_EPOCH,
137  Epoch to_epoch = INVALID_EPOCH,
138  int32_t node_filter = -1);
139 
141 
142  thread::Thread* get_context() const { return context_;}
143  const sequential::SequentialStorage& get_storage() const { return storage_; }
144 
146  Epoch get_from_epoch() const { return from_epoch_; }
148  Epoch get_to_epoch() const { return to_epoch_; }
149 
159 
164  bool is_valid() const {
165  return !(finished_snapshots_ && finished_safe_volatiles_ && finished_unsafe_volatiles_);
166  }
167 
169  bool is_finished_snapshots() const { return finished_snapshots_; }
170  bool is_finished_safe_volatiles() const { return finished_safe_volatiles_; }
171  bool is_finished_unsafe_volatiles() const { return finished_unsafe_volatiles_; }
172 
173  friend std::ostream& operator<<(std::ostream& o, const SequentialCursor& v);
174 
175  private:
181  struct NodeState {
182  explicit NodeState(uint16_t node_id);
183  ~NodeState();
184 
185  const HeadPagePointer& get_cur_head() const {
186  return snapshot_heads_[snapshot_cur_head_];
187  }
188 
189  const uint16_t node_id_;
190  uint16_t volatile_cur_core_;
196  uint32_t snapshot_cur_head_;
202  uint32_t snapshot_cur_buffer_;
208  uint32_t snapshot_buffered_pages_;
213  uint64_t snapshot_buffer_begin_;
217  std::vector<HeadPagePointer> snapshot_heads_;
218 
224  std::vector<SequentialPage*> volatile_cur_pages_;
225  };
226 
227  ErrorCode init_states();
228 
230  ErrorCode next_batch_snapshot(SequentialRecordIterator* out, bool* found);
231  ErrorCode next_batch_safe_volatiles(SequentialRecordIterator* out, bool* found);
232  ErrorCode next_batch_unsafe_volatiles(SequentialRecordIterator* out, bool* found);
233 
238  ErrorCode buffer_snapshot_pages(uint16_t node);
239 
241  SequentialPage* resolve_volatile(VolatilePagePointer pointer) const;
242 
243  void refresh_grace_epoch();
244 
245 
247  enum VolatileCheckPageResult {
248  kValidPage,
249  kNextPage,
250  kNextCore,
251  };
252 
254  VolatileCheckPageResult next_batch_safe_volatiles_check_page(const SequentialPage* page) const;
256  VolatileCheckPageResult next_batch_unsafe_volatiles_check_page(const SequentialPage* page) const;
257 
258  thread::Thread* const context_;
259  xct::Xct* const xct_;
260  Engine* const engine_;
261  const memory::GlobalVolatilePageResolver& resolver_;
262  sequential::SequentialStorage const storage_;
267  Epoch from_epoch_;
272  const Epoch to_epoch_;
273 
274  const Epoch latest_snapshot_epoch_;
275 
281  const Epoch from_epoch_volatile_;
282 
287  Epoch truncate_epoch_;
288 
289  const int32_t node_filter_;
290  const uint16_t node_count_;
291  const OrderMode order_mode_;
296  bool snapshot_only_;
301  bool safe_epoch_only_;
302 
303  SequentialRecordBatch* const buffer_;
304  const uint64_t buffer_size_;
306  const uint32_t buffer_pages_;
307 
310 
311  uint16_t current_node_;
312 
314  bool finished_snapshots_;
316  bool finished_safe_volatiles_;
318  bool finished_unsafe_volatiles_;
319 
321  Epoch grace_epoch_;
322 
324  std::vector<NodeState> states_;
325 };
326 
327 
340  PageHeader header_; // +32 -> 32
341 
342  uint16_t record_count_; // +2 -> 34
343  uint16_t used_data_bytes_; // +2 -> 36
344  uint32_t filler_; // +4 -> 40
345 
351 
357 
358 
359  uint16_t get_record_count() const { return record_count_; }
360  uint16_t get_record_length(uint16_t index) const {
361  ASSERT_ND(index < record_count_);
362  return reinterpret_cast<const uint16_t*>(data_ + sizeof(data_))[-index - 1];
363  }
364  const xct::RwLockableXctId* get_owner_id_from_offset(uint16_t offset) const {
365  ASSERT_ND(offset + record_count_ * sizeof(uint16_t) <= kDataSize);
366  return reinterpret_cast<const xct::RwLockableXctId*>(data_ + offset);
367  }
368  const char* get_payload_from_offset(uint16_t offset) const {
369  ASSERT_ND(offset + record_count_ * sizeof(uint16_t) <= kDataSize);
370  return data_ + offset + sizeof(xct::RwLockableXctId);
371  }
372  Epoch get_epoch_from_offset(uint16_t offset) const {
373  return get_owner_id_from_offset(offset)->xct_id_.get_epoch();
374  }
375 };
376 
384  public:
386  SequentialRecordIterator(const SequentialRecordBatch* batch, Epoch from_epoch, Epoch to_epoch);
387 
388  void reset() {
389  std::memset(this, 0, sizeof(SequentialRecordIterator));
390  }
391 
392  bool is_valid() const ALWAYS_INLINE { return cur_record_ < record_count_; }
394  while (true) {
395  if (UNLIKELY(cur_record_ + 1U >= record_count_)) {
396  cur_record_ = record_count_;
397  break;
398  }
399 
400  ++cur_record_;
401  cur_offset_ += assorted::align8(cur_record_length_) + sizeof(xct::RwLockableXctId);
402  cur_record_length_ = batch_->get_record_length(cur_record_);
403  cur_record_epoch_ = batch_->get_epoch_from_offset(cur_offset_);
404  ASSERT_ND(cur_record_epoch_.is_valid());
405  if (LIKELY(in_epoch_range(cur_record_epoch_))) {
406  break;
407  }
408  // we have to skip this record
409  ++stat_skipped_records_;
410  }
411  }
413  ASSERT_ND(is_valid());
414  return cur_record_length_;
415  }
417  ASSERT_ND(is_valid());
418  return cur_record_epoch_;
419  }
425  void copy_cur_record(char* out, uint16_t out_size) const ALWAYS_INLINE {
426  ASSERT_ND(is_valid());
427  const char* raw = get_cur_record_raw();
428  uint16_t copy_size = std::min<uint16_t>(out_size, cur_record_length_);
429  std::memcpy(out, raw, copy_size);
430  }
436  const char* get_cur_record_raw() const ALWAYS_INLINE {
437  ASSERT_ND(is_valid());
438  return batch_->get_payload_from_offset(cur_offset_);
439  }
441  ASSERT_ND(is_valid());
442  return batch_->get_owner_id_from_offset(cur_offset_);
443  }
444  bool in_epoch_range(Epoch epoch) const ALWAYS_INLINE {
445  return epoch >= from_epoch_ && epoch < to_epoch_;
446  }
447 
449  uint16_t get_stat_skipped_records() const { return stat_skipped_records_; }
451  uint16_t get_record_count() const { return record_count_; }
452 
453  const SequentialRecordBatch* get_raw_batch() const { return batch_; }
454 
455  private:
456  const SequentialRecordBatch* batch_; // +8 -> 8
457  Epoch from_epoch_; // +4 -> 12
458  Epoch to_epoch_; // +4 -> 16
459  Epoch cur_record_epoch_; // +4 -> 20
460  uint16_t record_count_; // +2 -> 22
461  uint16_t cur_record_; // +2 -> 24
462  uint16_t cur_record_length_; // +2 -> 26
463  uint16_t cur_offset_; // +2 -> 28
464  uint16_t stat_skipped_records_; // +2 -> 30
465  uint16_t filler_; // +2 -> 32
466 };
467 
469 
470 } // namespace sequential
471 } // namespace storage
472 } // namespace foedus
473 #endif // FOEDUS_STORAGE_SEQUENTIAL_SEQUENTIAL_CURSOR_HPP_
A cursor interface to read tuples from a sequential storage.
T align8(T value)
8-alignment.
const xct::RwLockableXctId * get_cur_record_owner_id() const __attribute__((always_inline))
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
OrderMode
The order this cursor returns tuples.
Definitions of IDs in this package and a few related constant values.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
Returns as many records as possible from node-0's core-0, core-1, do the same from node-1...
const xct::RwLockableXctId * get_owner_id_from_offset(uint16_t offset) const
Forward declarations of classes in transaction package.
DualPagePointer next_page_
Pointer to next page.
void copy_cur_record(char *out, uint16_t out_size) const __attribute__((always_inline))
Copies the current record to the given buffer.
Represents a time epoch.
Definition: epoch.hpp:61
XctId xct_id_
the second 64bit: Persistent status part of TID.
Definition: xct_id.hpp:1137
const char * get_cur_record_raw() const __attribute__((always_inline))
Directly returns a pointer to the current record.
bool is_valid() const __attribute__((always_inline))
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
Epoch get_cur_record_epoch() const __attribute__((always_inline))
Each pointer to a snapshot head page comes with a bit more information to help reading.
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
bool in_epoch_range(Epoch epoch) const __attribute__((always_inline))
const char * get_payload_from_offset(uint16_t offset) const
Represents an append/scan-only store.
ErrorCode next_batch(SequentialRecordIterator *out)
Returns a batch of records as an iterator.
Forward declarations of classes in sequential storage package.
Definitions of IDs in this package and a few related constant values.
bool is_finished_snapshots() const
Followings are rather implementation details. Used only from testcases.
#define CXX11_FINAL
Used in public headers in place of "final" of C++11.
Definition: cxx11.hpp:131
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
Just a marker to denote that a memory region represents a data page.
Definition: page.hpp:184
Definitions of IDs in this package and a few related constant values.
Forward declarations of classes in memory package.
const sequential::SequentialStorage & get_storage() const
bool is_valid() const
Definition: epoch.hpp:96
const SequentialRecordBatch * get_raw_batch() const
const Epoch INVALID_EPOCH
A constant epoch object that represents an invalid epoch.
Definition: epoch.hpp:204
char data_[kDataSize]
Dynamic data part in this page, which consist of 1) record part growing forward, 2) unused part...
const uint16_t kDataSize
Byte size of data region in each data page of sequential storage.
#define STATIC_SIZE_CHECK(desired, actual)
Iterator for one SequentialRecordBatch, or a page.
SequentialCursor(thread::Thread *context, const sequential::SequentialStorage &storage, void *buffer, uint64_t buffer_size, OrderMode order_mode=kNodeFirstMode, Epoch from_epoch=INVALID_EPOCH, Epoch to_epoch=INVALID_EPOCH, int32_t node_filter=-1)
Constructs a cursor to read tuples from this storage.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Forward declarations of classes in thread package.
A chunk of records returned by SequentialCursor.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106
uint16_t get_cur_record_length() const __attribute__((always_inline))
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
friend std::ostream & operator<<(std::ostream &o, const SequentialCursor &v)