libfoedus-core
FOEDUS Core Library
thread_log_buffer.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_LOG_THREAD_LOG_BUFFER_HPP_
19 #define FOEDUS_LOG_THREAD_LOG_BUFFER_HPP_
20 #include <stdint.h>
21 
22 #include <iosfwd>
23 
24 #include "foedus/cxx11.hpp"
25 #include "foedus/epoch.hpp"
26 #include "foedus/fwd.hpp"
27 #include "foedus/initializable.hpp"
28 #include "foedus/log/fwd.hpp"
31 
32 namespace foedus {
33 namespace log {
66  uint64_t offset_begin_;
74  uint64_t offset_end_;
75 
77  old_epoch_.reset();
78  new_epoch_.reset();
79  offset_begin_ = 0;
80  offset_end_ = 0;
81  }
82  ThreadEpockMark(Epoch old_epoch, Epoch new_epoch, uint64_t offset_begin) {
83  old_epoch_ = old_epoch;
84  new_epoch_ = new_epoch;
85  offset_begin_ = offset_begin;
86  offset_end_ = 0;
87  }
88  friend std::ostream& operator<<(std::ostream& o, const ThreadEpockMark& v);
89 };
90 
148  enum Constants {
158  };
159 
161 
167  void assert_consistent() const;
168  friend std::ostream& operator<<(std::ostream& o, const ThreadLogBufferMeta& v);
169 
170  static uint32_t increment_mark_index(uint32_t index) {
171  return (index + 1) % kMaxNonDurableEpochs;
172  }
173 
175  uint16_t padding1_;
176  uint32_t padding2_;
177 
179  uint64_t buffer_size_;
190 
200  uint64_t offset_head_;
201 
210  uint64_t offset_durable_;
211 
222 
231  uint64_t offset_tail_;
232 
252 };
253 
267  public:
268  friend class Logger;
274  static uint64_t distance(uint64_t buffer_size, uint64_t from, uint64_t to) ALWAYS_INLINE {
275  ASSERT_ND(from < buffer_size);
276  ASSERT_ND(to < buffer_size);
277  if (to >= from) {
278  return to - from;
279  } else {
280  return to + buffer_size - from; // wrap around
281  }
282  }
284  static void advance(uint64_t buffer_size, uint64_t *target, uint64_t amount) ALWAYS_INLINE {
285  ASSERT_ND(*target < buffer_size);
286  ASSERT_ND(amount < buffer_size);
287  *target += amount;
288  if (*target >= buffer_size) {
289  *target -= buffer_size;
290  }
291  }
292 
293  ThreadLogBuffer(Engine* engine, thread::ThreadId thread_id);
296 
298  ThreadLogBuffer(const ThreadLogBuffer &other) CXX11_FUNC_DELETE;
299  ThreadLogBuffer& operator=(const ThreadLogBuffer &other) CXX11_FUNC_DELETE;
300 
306  void assert_consistent() const {
307 #ifndef NDEBUG
308  meta_.assert_consistent();
309 #endif // NDEBUG
310  }
311  thread::ThreadId get_thread_id() const { return meta_.thread_id_; }
312 
320  char* reserve_new_log(uint16_t log_length) ALWAYS_INLINE {
321  if (UNLIKELY(log_length + meta_.offset_tail_ >= meta_.buffer_size_)) {
322  // now we need wrap around. to simplify, let's avoid having a log entry spanning the
323  // end of the buffer. put a filler log to fill the rest.
324  fillup_tail();
325  ASSERT_ND(meta_.offset_tail_ == 0);
326  }
327  // also make sure tail isn't too close to head (full). in that case, we wait for loggers
328  if (UNLIKELY(
329  head_to_tail_distance() + log_length >= meta_.buffer_size_safe_)) {
330  wait_for_space(log_length);
331  ASSERT_ND(head_to_tail_distance() + log_length < meta_.buffer_size_safe_);
332  }
333  ASSERT_ND(head_to_tail_distance() + log_length < meta_.buffer_size_safe_);
334  char *buffer = buffer_ + meta_.offset_tail_;
335  advance(meta_.buffer_size_, &meta_.offset_tail_, log_length);
336  return buffer;
337  }
339  return distance(meta_.buffer_size_, meta_.offset_head_, meta_.offset_tail_);
340  }
341 
346  Epoch last_epoch = get_last_epoch();
347  ASSERT_ND(commit_epoch >= last_epoch);
348  if (UNLIKELY(commit_epoch > last_epoch)) {
349  on_new_epoch_observed(commit_epoch); // epoch switches!
350  } else if (UNLIKELY(commit_epoch < last_epoch)) {
351  // This MUST not happen because it means an already durable epoch received a new log!
352  crash_stale_commit_epoch(commit_epoch);
353  }
354  meta_.offset_committed_ = meta_.offset_tail_;
355  }
356 
359  meta_.offset_tail_ = meta_.offset_committed_;
360  }
363  }
364 
366  void wait_for_space(uint16_t required_space);
367 
369  uint64_t get_offset_head() const { return meta_.offset_head_; }
370 
372  uint64_t get_offset_durable() const { return meta_.offset_durable_; }
373 
375  uint64_t get_offset_committed() const { return meta_.offset_committed_; }
376 
378  uint64_t get_offset_tail() const { return meta_.offset_tail_; }
379 
380 
382  const ThreadLogBufferMeta& get_meta() const { return meta_; }
383  const char* get_buffer() const { return buffer_; }
384 
385  struct OffsetRange {
386  uint64_t begin_;
387  uint64_t end_;
388  bool is_empty() const { return begin_ == end_; }
389  };
391  OffsetRange get_logs_to_write(Epoch written_epoch);
393  void on_log_written(Epoch written_epoch);
394 
395 
396  friend std::ostream& operator<<(std::ostream& o, const ThreadLogBuffer& v);
397 
398  private:
403  void on_new_epoch_observed(Epoch commit_epoch);
404 
406  void crash_stale_commit_epoch(Epoch commit_epoch);
407 
409  void fillup_tail();
410 
411  Engine* const engine_;
412  ThreadLogBufferMeta meta_;
413 
414 
422  char* buffer_;
423 };
424 } // namespace log
425 } // namespace foedus
426 #endif // FOEDUS_LOG_THREAD_LOG_BUFFER_HPP_
427 
uint64_t head_to_tail_distance() const __attribute__((always_inline))
friend std::ostream & operator<<(std::ostream &o, const ThreadLogBuffer &v)
static uint32_t increment_mark_index(uint32_t index)
uint32_t current_mark_index_
Array index in thread_epoch_marks_ that indicates the current epoch being appended by the thread...
uint64_t get_offset_head() const
This marks the position where log entries start.
void publish_committed_log(Epoch commit_epoch) __attribute__((always_inline))
Called when the current transaction is successfully committed.
A thread-local log buffer.
Forward declarations of classes in log manager package.
const char * get_buffer() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
void wait_for_space(uint16_t required_space)
Called when we have to wait till offset_head_ advances so that we can put new logs.
uint64_t buffer_size_
Size of the buffer assigned to this thread.
Typedefs of ID types used in thread package.
Each thread can have at most this number of epochs in this log buffer.
void reset()
Clears this epoch variable so that it points to an invalid epoch.
Definition: epoch.hpp:95
uint64_t offset_end_
Where the new epoch ends.
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
Forward declarations of classes in root package.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Epoch old_epoch_
The value of new_epoch_ of the previous mark.
Metadata part of ThreadLogBuffer, without the actual buffer which is way way larger.
Represents a time epoch.
Definition: epoch.hpp:61
Epoch new_epoch_
The epoch of log entries this mark represents.
Typical implementation of Initializable as a skeleton base class.
const ThreadLogBufferMeta & get_meta() const
Returns the state of this buffer.
ThreadEpockMark thread_epoch_marks_[kMaxNonDurableEpochs]
Circular array of epoch marks of a thread log buffer.
uint64_t offset_head_
This marks the position where log entries start.
static void advance(uint64_t buffer_size, uint64_t *target, uint64_t amount) __attribute__((always_inline))
Addition operator, considering wrapping around.
A log writer that writes out buffered logs to stable storages.
#define CXX11_FINAL
Used in public headers in place of "final" of C++11.
Definition: cxx11.hpp:131
OffsetRange get_logs_to_write(Epoch written_epoch)
Returns begin/end offsets of logs in the given epoch.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint64_t offset_begin_
Where the new epoch starts.
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
A thread-buffer's epoch marker, which indicates where a thread switched an epoch. ...
void discard_current_xct_log()
Called when the current transaction aborts.
friend std::ostream & operator<<(std::ostream &o, const ThreadLogBufferMeta &v)
uint64_t get_offset_durable() const
This marks the position upto which the log writer durably wrote out to log files. ...
void assert_consistent() const
Only for Debug-assertion.
friend std::ostream & operator<<(std::ostream &o, const ThreadEpockMark &v)
#define CXX11_OVERRIDE
Used in public headers in place of "override" of C++11.
Definition: cxx11.hpp:134
ErrorStack initialize_once() override
#define CXX11_FUNC_DELETE
Used in public headers in place of " = delete" of C++11.
Definition: cxx11.hpp:128
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
uint64_t offset_tail_
The current cursor to which next log will be written.
uint64_t get_offset_committed() const
This marks the position upto which transaction logs are committed by the thread.
static uint64_t distance(uint64_t buffer_size, uint64_t from, uint64_t to) __attribute__((always_inline))
Subtract operator, considering wrapping around.
void on_log_written(Epoch written_epoch)
Called when the logger wrote out all logs in the given epoch, advancing oldest_mark_index_.
void assert_consistent() const
Only for Debug-assertion.
uint32_t oldest_mark_index_
Array index in thread_epoch_marks_ that indicates the oldest epoch mark to be consumed by the logger...
ErrorStack uninitialize_once() override
uint64_t get_offset_tail() const
The current cursor to which next log will be written.
thread::ThreadId get_thread_id() const
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
uint64_t buffer_size_safe_
buffer_size_ - 64.
ThreadEpockMark(Epoch old_epoch, Epoch new_epoch, uint64_t offset_begin)