libfoedus-core
FOEDUS Core Library
foedus::log::ThreadLogBuffer Class Referencefinal

A thread-local log buffer. More...

Detailed Description

A thread-local log buffer.

This is a private implementation-details of Log Manager, thus file name ends with _impl. Do not include this header from a client program unless you know what you are doing.

Circular buffers
See also
foedus::log::ThreadLogBufferMeta
Epoch marks
See also
foedus::log::ThreadEpockMark

Definition at line 266 of file thread_log_buffer.hpp.

#include <thread_log_buffer.hpp>

Inheritance diagram for foedus::log::ThreadLogBuffer:
Collaboration diagram for foedus::log::ThreadLogBuffer:

Classes

struct  OffsetRange
 

Public Member Functions

 ThreadLogBuffer (Engine *engine, thread::ThreadId thread_id)
 
ErrorStack initialize_once () override
 
ErrorStack uninitialize_once () override
 
 ThreadLogBuffer ()=delete
 
 ThreadLogBuffer (const ThreadLogBuffer &other)=delete
 
ThreadLogBufferoperator= (const ThreadLogBuffer &other)=delete
 
void assert_consistent () const
 Only for Debug-assertion. More...
 
thread::ThreadId get_thread_id () const
 
char * reserve_new_log (uint16_t log_length) __attribute__((always_inline))
 Reserves a space for a new (uncommitted) log entry at the tail. More...
 
uint64_t head_to_tail_distance () const __attribute__((always_inline))
 
void publish_committed_log (Epoch commit_epoch) __attribute__((always_inline))
 Called when the current transaction is successfully committed. More...
 
void discard_current_xct_log ()
 Called when the current transaction aborts. More...
 
Epoch get_last_epoch () const
 
void wait_for_space (uint16_t required_space)
 Called when we have to wait till offset_head_ advances so that we can put new logs. More...
 
uint64_t get_offset_head () const
 This marks the position where log entries start. More...
 
uint64_t get_offset_durable () const
 This marks the position upto which the log writer durably wrote out to log files. More...
 
uint64_t get_offset_committed () const
 This marks the position upto which transaction logs are committed by the thread. More...
 
uint64_t get_offset_tail () const
 The current cursor to which next log will be written. More...
 
const ThreadLogBufferMetaget_meta () const
 Returns the state of this buffer. More...
 
const char * get_buffer () const
 
OffsetRange get_logs_to_write (Epoch written_epoch)
 Returns begin/end offsets of logs in the given epoch. More...
 
void on_log_written (Epoch written_epoch)
 Called when the logger wrote out all logs in the given epoch, advancing oldest_mark_index_. More...
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Static Public Member Functions

static uint64_t distance (uint64_t buffer_size, uint64_t from, uint64_t to) __attribute__((always_inline))
 Subtract operator, considering wrapping around. More...
 
static void advance (uint64_t buffer_size, uint64_t *target, uint64_t amount) __attribute__((always_inline))
 Addition operator, considering wrapping around. More...
 

Friends

class Logger
 
std::ostream & operator<< (std::ostream &o, const ThreadLogBuffer &v)
 

Constructor & Destructor Documentation

foedus::log::ThreadLogBuffer::ThreadLogBuffer ( Engine engine,
thread::ThreadId  thread_id 
)

Definition at line 104 of file thread_log_buffer.cpp.

References foedus::log::ThreadLogBufferMeta::thread_id_.

105  : engine_(engine), meta_() {
106  meta_.thread_id_ = thread_id;
107  buffer_ = nullptr;
108 }
foedus::log::ThreadLogBuffer::ThreadLogBuffer ( )
delete
foedus::log::ThreadLogBuffer::ThreadLogBuffer ( const ThreadLogBuffer other)
delete

Member Function Documentation

static void foedus::log::ThreadLogBuffer::advance ( uint64_t  buffer_size,
uint64_t *  target,
uint64_t  amount 
)
inlinestatic

Addition operator, considering wrapping around.

Definition at line 284 of file thread_log_buffer.hpp.

References ASSERT_ND.

Referenced by reserve_new_log().

284  {
285  ASSERT_ND(*target < buffer_size);
286  ASSERT_ND(amount < buffer_size);
287  *target += amount;
288  if (*target >= buffer_size) {
289  *target -= buffer_size;
290  }
291  }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the caller graph for this function:

void foedus::log::ThreadLogBuffer::assert_consistent ( ) const
inline

Only for Debug-assertion.

This method must be called by the worker thread itself. When logger calls it, it must make sure the worker is not modifying anything concurrently.

Definition at line 306 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::assert_consistent().

Referenced by wait_for_space().

306  {
307 #ifndef NDEBUG
308  meta_.assert_consistent();
309 #endif // NDEBUG
310  }
void assert_consistent() const
Only for Debug-assertion.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::log::ThreadLogBuffer::discard_current_xct_log ( )
inline

Called when the current transaction aborts.

Definition at line 358 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::offset_committed_, and foedus::log::ThreadLogBufferMeta::offset_tail_.

Referenced by foedus::xct::XctManagerPimpl::abort_xct(), and foedus::xct::XctManagerPimpl::precommit_xct_readwrite().

358  {
359  meta_.offset_tail_ = meta_.offset_committed_;
360  }
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
uint64_t offset_tail_
The current cursor to which next log will be written.

Here is the caller graph for this function:

static uint64_t foedus::log::ThreadLogBuffer::distance ( uint64_t  buffer_size,
uint64_t  from,
uint64_t  to 
)
inlinestatic

Subtract operator, considering wrapping around.

Attention
Be careful. As this is a circular buffer, from and to are NOT commutative. actually, distance(from, to) = buffer_size - distance(to, from).

Definition at line 274 of file thread_log_buffer.hpp.

References ASSERT_ND.

Referenced by head_to_tail_distance().

274  {
275  ASSERT_ND(from < buffer_size);
276  ASSERT_ND(to < buffer_size);
277  if (to >= from) {
278  return to - from;
279  } else {
280  return to + buffer_size - from; // wrap around
281  }
282  }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the caller graph for this function:

const char* foedus::log::ThreadLogBuffer::get_buffer ( ) const
inline

Definition at line 383 of file thread_log_buffer.hpp.

383 { return buffer_; }
Epoch foedus::log::ThreadLogBuffer::get_last_epoch ( ) const
inline

Definition at line 361 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::current_mark_index_, foedus::log::ThreadEpockMark::new_epoch_, and foedus::log::ThreadLogBufferMeta::thread_epoch_marks_.

Referenced by publish_committed_log().

361  {
363  }
uint32_t current_mark_index_
Array index in thread_epoch_marks_ that indicates the current epoch being appended by the thread...
Epoch new_epoch_
The epoch of log entries this mark represents.
ThreadEpockMark thread_epoch_marks_[kMaxNonDurableEpochs]
Circular array of epoch marks of a thread log buffer.

Here is the caller graph for this function:

ThreadLogBuffer::OffsetRange foedus::log::ThreadLogBuffer::get_logs_to_write ( Epoch  written_epoch)

Returns begin/end offsets of logs in the given epoch.

It might be empty (0/0).

Definition at line 234 of file thread_log_buffer.cpp.

References ASSERT_ND, foedus::log::ThreadLogBuffer::OffsetRange::begin_, foedus::log::ThreadLogBufferMeta::current_mark_index_, foedus::log::ThreadLogBuffer::OffsetRange::end_, foedus::log::ThreadLogBufferMeta::increment_mark_index(), foedus::log::ThreadLogBufferMeta::kMaxNonDurableEpochs, foedus::assorted::memory_fence_acq_rel(), foedus::assorted::memory_fence_acquire(), foedus::log::ThreadEpockMark::new_epoch_, foedus::log::ThreadEpockMark::offset_begin_, foedus::log::ThreadLogBufferMeta::offset_committed_, foedus::log::ThreadLogBufferMeta::offset_durable_, foedus::log::ThreadEpockMark::offset_end_, foedus::log::ThreadLogBufferMeta::oldest_mark_index_, and foedus::log::ThreadLogBufferMeta::thread_epoch_marks_.

234  {
235  // See ThreadLogBufferMeta's class comment about tricky cases (the thread being idle).
236  // assert_consistent(); this verification assumes the worker is not working. we can't use it here
237  OffsetRange ret;
238  ThreadEpockMark& target = meta_.thread_epoch_marks_[meta_.oldest_mark_index_];
239  if (target.new_epoch_ > written_epoch) {
240  // Case 1) no logs in this epoch.
241  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
242  ret.begin_ = 0;
243  ret.end_ = 0;
244  } else if (target.new_epoch_ == written_epoch) {
245  // Case 2) Is it 2-a? or 2-b?
246  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
247  ret.begin_ = target.offset_begin_;
248  if (meta_.oldest_mark_index_ != meta_.current_mark_index_) {
249  // 2-a, easy.
250  ret.end_ = target.offset_end_;
251  } else {
252  // 2-b, now we have to populate target.offset_end_ ourselves.
253  VLOG(0) << "This guy seems sleeping for a while.." << *this;
254  // We can safely populate target.offset_end_, but be careful on reading offset_committed_.
255  // If the thread now resumes working, it adds a new mark *and then* increments
256  // offset_committed_. So, it's safe as far as we take appropriate fences.
257 
258  // well, a bit redundant fences, but this part is not executed so often. no issue.
260  uint64_t committed_copy = meta_.offset_committed_;
262  bool still_current = meta_.oldest_mark_index_ == meta_.current_mark_index_;
264  if (still_current) {
265  target.offset_end_ = committed_copy;
266  VLOG(0) << "Okay, the logger populated the offset_end on behalf." << *this;
267  } else {
268  // This is super-rare.
269  LOG(INFO) << "Interesting. The thread has now awaken and added a new mark. offset_end"
270  << " should be now populated. "<< *this;
271  }
272 
274  ret.end_ = target.offset_end_;
275  }
276  } else {
277  // Case 3) First, consume stale marks as much as possible
278  // (note, in this case "target.offset_begin_ == meta_.offset_durable_" might not hold
279  ASSERT_ND(target.new_epoch_ < written_epoch);
280  if (meta_.oldest_mark_index_ != meta_.current_mark_index_) {
281  VLOG(0) << "Advancing oldest mark index. before=" << *this;
282  while (meta_.oldest_mark_index_ != meta_.current_mark_index_
283  && meta_.thread_epoch_marks_[meta_.oldest_mark_index_].new_epoch_ < written_epoch) {
284  meta_.oldest_mark_index_
288  LOG(FATAL) << "meta_.oldest_mark_index_ out of range. we must have waited.";
289  }
290  }
291  VLOG(0) << "Advanced oldest mark index. after=" << *this;
292  // Then re-evaluate. It might be now Case 1 or 2. recursion, but essentially a retry.
293  ret = get_logs_to_write(written_epoch);
294  } else {
295  VLOG(1) << "Heh, this guy is still sleeping.." << *this;
296  ret.begin_ = 0;
297  ret.end_ = 0;
298  }
299  }
300  return ret;
301 }
static uint32_t increment_mark_index(uint32_t index)
uint32_t current_mark_index_
Array index in thread_epoch_marks_ that indicates the current epoch being appended by the thread...
Each thread can have at most this number of epochs in this log buffer.
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
Epoch new_epoch_
The epoch of log entries this mark represents.
ThreadEpockMark thread_epoch_marks_[kMaxNonDurableEpochs]
Circular array of epoch marks of a thread log buffer.
OffsetRange get_logs_to_write(Epoch written_epoch)
Returns begin/end offsets of logs in the given epoch.
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
uint32_t oldest_mark_index_
Array index in thread_epoch_marks_ that indicates the oldest epoch mark to be consumed by the logger...
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).

Here is the call graph for this function:

const ThreadLogBufferMeta& foedus::log::ThreadLogBuffer::get_meta ( ) const
inline

Returns the state of this buffer.

Definition at line 382 of file thread_log_buffer.hpp.

382 { return meta_; }
uint64_t foedus::log::ThreadLogBuffer::get_offset_committed ( ) const
inline

This marks the position upto which transaction logs are committed by the thread.

Log writers can safely read log entries and write them to log files up to this place. When the transaction commits, this value is advanced by the thread. The only possible update pattern to this variable is advance by this thread. Thus, the log writer can safely read this variable without any fence or lock thanks to regular (either old value or new value, never garbage) read of 64-bit.

Definition at line 375 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::offset_committed_.

Referenced by foedus::xct::XctManagerPimpl::begin_xct(), and foedus::xct::XctManagerPimpl::precommit_xct_readonly().

375 { return meta_.offset_committed_; }
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.

Here is the caller graph for this function:

uint64_t foedus::log::ThreadLogBuffer::get_offset_durable ( ) const
inline

This marks the position upto which the log writer durably wrote out to log files.

Everything after this position must not be discarded because they are not yet durable. When the log writer reads log entries after here, writes them to log file, and calls fsync, this variable is advanced by the log writer. This variable is read by this thread to check the end of the circular buffer.

Definition at line 372 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::offset_durable_.

372 { return meta_.offset_durable_; }
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
uint64_t foedus::log::ThreadLogBuffer::get_offset_head ( ) const
inline

This marks the position where log entries start.

This private log buffer is a circular buffer where the head is eaten by log gleaner. However, log gleaner is okay to get behind, reading from log file instead (but slower). Thus, offset_head_ is advanced either by log gleaner or this thread. If the latter happens, log gleaner has to give up using in-memory logs and instead read from log files.

Definition at line 369 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::offset_head_.

369 { return meta_.offset_head_; }
uint64_t offset_head_
This marks the position where log entries start.
uint64_t foedus::log::ThreadLogBuffer::get_offset_tail ( ) const
inline

The current cursor to which next log will be written.

This is the location the current transaction of this thread is writing to before commit. When the transaction commits, offset_committed_ catches up with this. When the transaction aborts, this value rolls back to offset_committed_. Only this thread reads/writes to this variable. No other threads access this.

Definition at line 378 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::offset_tail_.

Referenced by foedus::xct::XctManagerPimpl::begin_xct(), and foedus::xct::XctManagerPimpl::precommit_xct_readonly().

378 { return meta_.offset_tail_; }
uint64_t offset_tail_
The current cursor to which next log will be written.

Here is the caller graph for this function:

thread::ThreadId foedus::log::ThreadLogBuffer::get_thread_id ( ) const
inline

Definition at line 311 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::thread_id_.

311 { return meta_.thread_id_; }
uint64_t foedus::log::ThreadLogBuffer::head_to_tail_distance ( ) const
inline

Definition at line 338 of file thread_log_buffer.hpp.

References foedus::log::ThreadLogBufferMeta::buffer_size_, distance(), foedus::log::ThreadLogBufferMeta::offset_head_, and foedus::log::ThreadLogBufferMeta::offset_tail_.

Referenced by reserve_new_log(), and wait_for_space().

338  {
339  return distance(meta_.buffer_size_, meta_.offset_head_, meta_.offset_tail_);
340  }
uint64_t buffer_size_
Size of the buffer assigned to this thread.
uint64_t offset_head_
This marks the position where log entries start.
uint64_t offset_tail_
The current cursor to which next log will be written.
static uint64_t distance(uint64_t buffer_size, uint64_t from, uint64_t to) __attribute__((always_inline))
Subtract operator, considering wrapping around.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::log::ThreadLogBuffer::initialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 110 of file thread_log_buffer.cpp.

References foedus::log::ThreadLogBufferMeta::buffer_size_, foedus::log::ThreadLogBufferMeta::buffer_size_safe_, foedus::log::ThreadLogBufferMeta::current_mark_index_, foedus::memory::AlignedMemorySlice::get_block(), foedus::memory::NumaNodeMemory::get_core_memory(), foedus::savepoint::SavepointManager::get_initial_current_epoch(), foedus::savepoint::SavepointManager::get_initial_durable_epoch(), foedus::memory::EngineMemory::get_local_memory(), foedus::memory::NumaCoreMemory::get_log_buffer_memory(), foedus::Engine::get_memory_manager(), foedus::Engine::get_savepoint_manager(), foedus::memory::AlignedMemorySlice::get_size(), foedus::kRetOk, foedus::log::ThreadLogBufferMeta::offset_committed_, foedus::log::ThreadLogBufferMeta::offset_durable_, foedus::log::ThreadLogBufferMeta::offset_head_, foedus::log::ThreadLogBufferMeta::offset_tail_, foedus::log::ThreadLogBufferMeta::oldest_mark_index_, foedus::log::ThreadLogBufferMeta::thread_epoch_marks_, and foedus::log::ThreadLogBufferMeta::thread_id_.

110  {
111  memory::NumaCoreMemory *memory
113  memory::AlignedMemorySlice buffer_memory = memory->get_log_buffer_memory();
114  buffer_ = reinterpret_cast<char*>(buffer_memory.get_block());
115  meta_.buffer_size_ = buffer_memory.get_size();
116  meta_.buffer_size_safe_ = meta_.buffer_size_ - 64;
117 
118  meta_.offset_head_ = 0;
119  meta_.offset_durable_ = 0;
120  meta_.offset_committed_ = 0;
121  meta_.offset_tail_ = 0;
122 
123  Epoch initial_current = engine_->get_savepoint_manager()->get_initial_current_epoch();
124  Epoch initial_durable = engine_->get_savepoint_manager()->get_initial_durable_epoch();
125  meta_.current_mark_index_ = 0;
126  meta_.oldest_mark_index_ = 0;
127  meta_.thread_epoch_marks_[0] = ThreadEpockMark(initial_durable, initial_current, 0);
128  return kRetOk;
129 }
uint32_t current_mark_index_
Array index in thread_epoch_marks_ that indicates the current epoch being appended by the thread...
NumaCoreMemory * get_core_memory(foedus::thread::ThreadId id) const
uint64_t buffer_size_
Size of the buffer assigned to this thread.
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
AlignedMemorySlice get_log_buffer_memory() const
ThreadEpockMark thread_epoch_marks_[kMaxNonDurableEpochs]
Circular array of epoch marks of a thread log buffer.
uint64_t offset_head_
This marks the position where log entries start.
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
NumaNodeMemory * get_local_memory() const
uint64_t offset_tail_
The current cursor to which next log will be written.
const ErrorStack kRetOk
Normal return value for no-error case.
uint32_t oldest_mark_index_
Array index in thread_epoch_marks_ that indicates the oldest epoch mark to be consumed by the logger...
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
uint64_t buffer_size_safe_
buffer_size_ - 64.

Here is the call graph for this function:

void foedus::log::ThreadLogBuffer::on_log_written ( Epoch  written_epoch)

Called when the logger wrote out all logs in the given epoch, advancing oldest_mark_index_.

Definition at line 303 of file thread_log_buffer.cpp.

References ASSERT_ND, foedus::log::ThreadLogBufferMeta::current_mark_index_, foedus::log::ThreadLogBufferMeta::increment_mark_index(), foedus::log::ThreadEpockMark::new_epoch_, foedus::log::ThreadEpockMark::offset_begin_, foedus::log::ThreadLogBufferMeta::offset_durable_, foedus::log::ThreadEpockMark::offset_end_, foedus::log::ThreadLogBufferMeta::oldest_mark_index_, and foedus::log::ThreadLogBufferMeta::thread_epoch_marks_.

303  {
304  // See ThreadLogBufferMeta's class comment about tricky cases (the thread being idle).
305  // assert_consistent(); this verification assumes the worker is not working. we can't use it here
306 
307  ThreadEpockMark& target = meta_.thread_epoch_marks_[meta_.oldest_mark_index_];
308  if (target.new_epoch_ > written_epoch) {
309  // Case 1) no thing to do.
310  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
311  } else if (target.new_epoch_ == written_epoch) {
312  // Case 2) Is it 2-a? or 2-b?
313  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
314  meta_.offset_durable_ = target.offset_end_;
315  if (meta_.oldest_mark_index_ != meta_.current_mark_index_) {
316  // 2-a, go to next mark.
317  meta_.oldest_mark_index_
319  } else {
320  // 2-b, remains here. Will be case-3 next time.
321  }
322  } else {
323  // Case 3 (even after consuming stale marks). Do nothing.
324  ASSERT_ND(target.new_epoch_ < written_epoch);
325  }
326  DVLOG(0) << "Logger has written out all logs in epoch-" << written_epoch << ". " << *this;
327  // assert_consistent(); this verification assumes the worker is not working. we can't use it here
328 }
static uint32_t increment_mark_index(uint32_t index)
uint32_t current_mark_index_
Array index in thread_epoch_marks_ that indicates the current epoch being appended by the thread...
ThreadEpockMark thread_epoch_marks_[kMaxNonDurableEpochs]
Circular array of epoch marks of a thread log buffer.
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
uint32_t oldest_mark_index_
Array index in thread_epoch_marks_ that indicates the oldest epoch mark to be consumed by the logger...
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

ThreadLogBuffer& foedus::log::ThreadLogBuffer::operator= ( const ThreadLogBuffer other)
delete
void foedus::log::ThreadLogBuffer::publish_committed_log ( Epoch  commit_epoch)
inline

Called when the current transaction is successfully committed.

Definition at line 345 of file thread_log_buffer.hpp.

References ASSERT_ND, get_last_epoch(), foedus::log::ThreadLogBufferMeta::offset_committed_, foedus::log::ThreadLogBufferMeta::offset_tail_, and UNLIKELY.

Referenced by foedus::xct::XctManagerPimpl::precommit_xct_readwrite().

345  {
346  Epoch last_epoch = get_last_epoch();
347  ASSERT_ND(commit_epoch >= last_epoch);
348  if (UNLIKELY(commit_epoch > last_epoch)) {
349  on_new_epoch_observed(commit_epoch); // epoch switches!
350  } else if (UNLIKELY(commit_epoch < last_epoch)) {
351  // This MUST not happen because it means an already durable epoch received a new log!
352  crash_stale_commit_epoch(commit_epoch);
353  }
354  meta_.offset_committed_ = meta_.offset_tail_;
355  }
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
uint64_t offset_tail_
The current cursor to which next log will be written.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

char* foedus::log::ThreadLogBuffer::reserve_new_log ( uint16_t  log_length)
inline

Reserves a space for a new (uncommitted) log entry at the tail.

Parameters
[in]log_lengthbyte size of the log. You have to give it beforehand.

If the circular buffer's tail reaches the head, this method might block. But it will be rare as we release a large region of buffer at each time.

Definition at line 320 of file thread_log_buffer.hpp.

References advance(), ASSERT_ND, foedus::log::ThreadLogBufferMeta::buffer_size_, foedus::log::ThreadLogBufferMeta::buffer_size_safe_, head_to_tail_distance(), foedus::log::ThreadLogBufferMeta::offset_tail_, UNLIKELY, and wait_for_space().

Referenced by foedus::storage::sequential::SequentialStorage::append_record(), foedus::storage::masstree::MasstreeStoragePimpl::delete_general(), foedus::storage::hash::HashStoragePimpl::delete_record(), foedus::storage::masstree::MasstreeStoragePimpl::increment_general(), foedus::storage::array::ArrayStoragePimpl::increment_record(), foedus::storage::hash::HashStoragePimpl::increment_record(), foedus::storage::array::ArrayStoragePimpl::increment_record_oneshot(), foedus::storage::masstree::MasstreeStoragePimpl::insert_general(), foedus::storage::hash::HashStoragePimpl::insert_record(), foedus::storage::masstree::MasstreeStoragePimpl::overwrite_general(), foedus::storage::array::ArrayStoragePimpl::overwrite_record(), foedus::storage::hash::HashStoragePimpl::overwrite_record(), foedus::storage::array::ArrayStoragePimpl::overwrite_record_primitive(), foedus::storage::masstree::MasstreeStoragePimpl::upsert_general(), and foedus::storage::hash::HashStoragePimpl::upsert_record().

320  {
321  if (UNLIKELY(log_length + meta_.offset_tail_ >= meta_.buffer_size_)) {
322  // now we need wrap around. to simplify, let's avoid having a log entry spanning the
323  // end of the buffer. put a filler log to fill the rest.
324  fillup_tail();
325  ASSERT_ND(meta_.offset_tail_ == 0);
326  }
327  // also make sure tail isn't too close to head (full). in that case, we wait for loggers
328  if (UNLIKELY(
329  head_to_tail_distance() + log_length >= meta_.buffer_size_safe_)) {
330  wait_for_space(log_length);
331  ASSERT_ND(head_to_tail_distance() + log_length < meta_.buffer_size_safe_);
332  }
333  ASSERT_ND(head_to_tail_distance() + log_length < meta_.buffer_size_safe_);
334  char *buffer = buffer_ + meta_.offset_tail_;
335  advance(meta_.buffer_size_, &meta_.offset_tail_, log_length);
336  return buffer;
337  }
uint64_t head_to_tail_distance() const __attribute__((always_inline))
void wait_for_space(uint16_t required_space)
Called when we have to wait till offset_head_ advances so that we can put new logs.
uint64_t buffer_size_
Size of the buffer assigned to this thread.
static void advance(uint64_t buffer_size, uint64_t *target, uint64_t amount) __attribute__((always_inline))
Addition operator, considering wrapping around.
uint64_t offset_tail_
The current cursor to which next log will be written.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint64_t buffer_size_safe_
buffer_size_ - 64.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::log::ThreadLogBuffer::uninitialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 131 of file thread_log_buffer.cpp.

References foedus::kRetOk.

131  {
132  buffer_ = nullptr;
133  return kRetOk;
134 }
const ErrorStack kRetOk
Normal return value for no-error case.
void foedus::log::ThreadLogBuffer::wait_for_space ( uint16_t  required_space)

Called when we have to wait till offset_head_ advances so that we can put new logs.

Definition at line 136 of file thread_log_buffer.cpp.

References assert_consistent(), ASSERT_ND, foedus::log::ThreadLogBufferMeta::buffer_size_safe_, foedus::log::LogOptions::emulation_, foedus::Engine::get_log_manager(), foedus::Engine::get_options(), head_to_tail_distance(), foedus::EngineOptions::log_, foedus::assorted::memory_fence_acquire(), foedus::assorted::memory_fence_release(), foedus::fs::DeviceEmulationOptions::null_device_, foedus::log::ThreadLogBufferMeta::offset_committed_, foedus::log::ThreadLogBufferMeta::offset_durable_, foedus::log::ThreadLogBufferMeta::offset_head_, foedus::log::ThreadLogBufferMeta::thread_id_, and foedus::log::LogManager::wakeup_loggers().

Referenced by reserve_new_log().

136  {
138  LOG(INFO) << "Thread-" << meta_.thread_id_ << " waiting for space to write logs..";
139  if (engine_->get_options().log_.emulation_.null_device_) {
140  // logging disabled
141  meta_.offset_head_ = meta_.offset_durable_ = meta_.offset_committed_;
143  return;
144  }
145  // @spinlock, but with a sleep (not in critical path, usually).
146  while (head_to_tail_distance() + required_space >= meta_.buffer_size_safe_) {
148  if (meta_.offset_durable_ != meta_.offset_head_) {
149  // TASK(Hideaki) actually we should kick axx of log gleaner in this case.
150  LOG(INFO) << "Thread-" << meta_.thread_id_ << " moving head to durable: " << *this;
152  meta_.offset_head_ = meta_.offset_durable_;
154  } else {
155  LOG(WARNING) << "Thread-" << meta_.thread_id_ << " logger is getting behind. sleeping "
156  << " for a while.." << *this;
157  engine_->get_log_manager()->wakeup_loggers();
158  // TASK(Hideaki) this duration should be configurable.
159  std::this_thread::sleep_for(std::chrono::milliseconds(20));
160  }
161  }
162  ASSERT_ND(head_to_tail_distance() + required_space < meta_.buffer_size_safe_);
164 }
uint64_t head_to_tail_distance() const __attribute__((always_inline))
void wakeup_loggers()
Wake up loggers if they are sleeping.
Definition: log_manager.cpp:35
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint64_t offset_head_
This marks the position where log entries start.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
void assert_consistent() const
Only for Debug-assertion.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
bool null_device_
[Experiments] as if we write out to /dev/null.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
uint64_t buffer_size_safe_
buffer_size_ - 64.

Here is the call graph for this function:

Here is the caller graph for this function:

Friends And Related Function Documentation

friend class Logger
friend

Definition at line 268 of file thread_log_buffer.hpp.

std::ostream& operator<< ( std::ostream &  o,
const ThreadLogBuffer v 
)
friend

Definition at line 330 of file thread_log_buffer.cpp.

330  {
331  o << v.meta_;
332  return o;
333 }

The documentation for this class was generated from the following files: