libfoedus-core
FOEDUS Core Library
thread_log_buffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <chrono>
23 #include <cstring>
24 #include <list>
25 #include <ostream>
26 #include <thread>
27 
28 #include "foedus/assert_nd.hpp"
29 #include "foedus/engine.hpp"
40 
41 namespace foedus {
42 namespace log {
43 
45  thread_id_ = 0;
46  buffer_size_ = 0;
48 
49  offset_head_ = 0;
50  offset_durable_ = 0;
52  offset_tail_ = 0;
53 
54  std::memset(thread_epoch_marks_, 0, sizeof(thread_epoch_marks_));
57 }
58 
60 #ifndef NDEBUG
61  // These assertions might have a false positive unless run in a 1-threaded situation.
62  // But, it would be negligibly rare, and anyway it is wiped out in release mode.
63  // So, if you believe you are hitting assertion here because of a sheer luck, ignore.
64  // In 99% cases these assersions fire for a valid reason, though.
65  // Q: Making it thread-safe? A: That means even non-debug codepath becomes superslow.
66  // Note, we thus use this method only from the worker thread itself. Then it's safe.
71  // because of wrap around, *at most* one of them does not hold
72  int violation_count = 0;
74  ++violation_count;
75  }
77  ++violation_count;
78  }
80  ++violation_count;
81  }
82  ASSERT_ND(violation_count <= 1);
83  ASSERT_ND(oldest_mark_index_ < static_cast<uint32_t>(kMaxNonDurableEpochs));
84  ASSERT_ND(current_mark_index_ < static_cast<uint32_t>(kMaxNonDurableEpochs));
85 
86  for (uint32_t i = oldest_mark_index_; i != current_mark_index_; i = increment_mark_index(i)) {
91  ASSERT_ND(mark.old_epoch_ < mark.new_epoch_);
92  ASSERT_ND(mark.new_epoch_ == next.old_epoch_);
93  ASSERT_ND(mark.offset_end_ == next.offset_begin_);
94  }
98  ASSERT_ND(cur.old_epoch_ < cur.new_epoch_);
100 #endif // NDEBUG
101 }
102 
103 
105  : engine_(engine), meta_() {
106  meta_.thread_id_ = thread_id;
107  buffer_ = nullptr;
108 }
109 
111  memory::NumaCoreMemory *memory
113  memory::AlignedMemorySlice buffer_memory = memory->get_log_buffer_memory();
114  buffer_ = reinterpret_cast<char*>(buffer_memory.get_block());
115  meta_.buffer_size_ = buffer_memory.get_size();
116  meta_.buffer_size_safe_ = meta_.buffer_size_ - 64;
117 
118  meta_.offset_head_ = 0;
119  meta_.offset_durable_ = 0;
120  meta_.offset_committed_ = 0;
121  meta_.offset_tail_ = 0;
122 
123  Epoch initial_current = engine_->get_savepoint_manager()->get_initial_current_epoch();
124  Epoch initial_durable = engine_->get_savepoint_manager()->get_initial_durable_epoch();
125  meta_.current_mark_index_ = 0;
126  meta_.oldest_mark_index_ = 0;
127  meta_.thread_epoch_marks_[0] = ThreadEpockMark(initial_durable, initial_current, 0);
128  return kRetOk;
129 }
130 
132  buffer_ = nullptr;
133  return kRetOk;
134 }
135 
136 void ThreadLogBuffer::wait_for_space(uint16_t required_space) {
138  LOG(INFO) << "Thread-" << meta_.thread_id_ << " waiting for space to write logs..";
139  if (engine_->get_options().log_.emulation_.null_device_) {
140  // logging disabled
141  meta_.offset_head_ = meta_.offset_durable_ = meta_.offset_committed_;
143  return;
144  }
145  // @spinlock, but with a sleep (not in critical path, usually).
146  while (head_to_tail_distance() + required_space >= meta_.buffer_size_safe_) {
148  if (meta_.offset_durable_ != meta_.offset_head_) {
149  // TASK(Hideaki) actually we should kick axx of log gleaner in this case.
150  LOG(INFO) << "Thread-" << meta_.thread_id_ << " moving head to durable: " << *this;
152  meta_.offset_head_ = meta_.offset_durable_;
154  } else {
155  LOG(WARNING) << "Thread-" << meta_.thread_id_ << " logger is getting behind. sleeping "
156  << " for a while.." << *this;
157  engine_->get_log_manager()->wakeup_loggers();
158  // TASK(Hideaki) this duration should be configurable.
159  std::this_thread::sleep_for(std::chrono::milliseconds(20));
160  }
161  }
162  ASSERT_ND(head_to_tail_distance() + required_space < meta_.buffer_size_safe_);
164 }
165 
166 void ThreadLogBuffer::fillup_tail() {
168  uint64_t len = meta_.buffer_size_ - meta_.offset_tail_;
169  if (head_to_tail_distance() + len >= meta_.buffer_size_safe_) {
170  wait_for_space(len);
171  }
173  FillerLogType *filler = reinterpret_cast<FillerLogType*>(buffer_ + meta_.offset_tail_);
174  filler->populate(len);
175  advance(meta_.buffer_size_, &meta_.offset_tail_, len);
176  ASSERT_ND(meta_.offset_tail_ == 0);
178 }
179 
180 void ThreadLogBuffer::on_new_epoch_observed(Epoch commit_epoch) {
182  ThreadEpockMark& last_mark = meta_.thread_epoch_marks_[meta_.current_mark_index_];
183  Epoch last_epoch = last_mark.new_epoch_;
184  ASSERT_ND(commit_epoch > last_epoch);
185  VLOG(0) << "Thread-" << meta_.thread_id_ << " is writing out the first log entry in epoch-"
186  << commit_epoch
187  << " at offset " << meta_.offset_committed_ << ". old epoch=" << last_epoch;
188  DVLOG(0) << "Before: " << *this;
189  ThreadEpockMark new_mark(last_epoch, commit_epoch, meta_.offset_committed_);
190 
191  // we will close the current mark, populating its offset_end_. It might be already
192  // set by the logger while this thread was idle. In that case, the value must match.
193  if (last_mark.offset_end_ != 0) {
194  LOG(INFO) << "Interesting. Thread-" << meta_.thread_id_ << "'s last epoch marker was"
195  << " already populated by the logger. This thread was probably idle for a while.";
196  ASSERT_ND(last_mark.offset_end_ == meta_.offset_committed_);
197  }
198  last_mark.offset_end_ = meta_.offset_committed_;
199 
201  if (meta_.oldest_mark_index_ == new_index) {
202  LOG(INFO) << "Thread-" << meta_.thread_id_ << " has to wait until the logger catches up."
203  << " If this often happens, you should increase the number of loggers.";
204  while (true) {
205  // this doesn't happen often. So simply sleep for a while.
206  std::this_thread::sleep_for(std::chrono::milliseconds(10));
208  if (meta_.oldest_mark_index_ != new_index) {
209  break;
210  }
211  VLOG(0) << "Thread-" << meta_.thread_id_ << " still waiting until the logger catches up...";
212  }
213  }
214  ASSERT_ND(meta_.oldest_mark_index_ != new_index);
215 
217  meta_.thread_epoch_marks_[new_index] = new_mark;
219  meta_.current_mark_index_ = new_index;
221 
222  DVLOG(0) << "After: " << *this;
224 }
225 
226 void ThreadLogBuffer::crash_stale_commit_epoch(Epoch commit_epoch) {
228  LOG(FATAL) << "Received a log-publication request with commit_epoch=" << commit_epoch
229  << ", which is older than the last epoch=" << get_last_epoch() << ", this is a BUG!"
230  << std::endl << " Buffer=" << *this;
232 }
233 
235  // See ThreadLogBufferMeta's class comment about tricky cases (the thread being idle).
236  // assert_consistent(); this verification assumes the worker is not working. we can't use it here
237  OffsetRange ret;
239  if (target.new_epoch_ > written_epoch) {
240  // Case 1) no logs in this epoch.
241  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
242  ret.begin_ = 0;
243  ret.end_ = 0;
244  } else if (target.new_epoch_ == written_epoch) {
245  // Case 2) Is it 2-a? or 2-b?
246  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
247  ret.begin_ = target.offset_begin_;
248  if (meta_.oldest_mark_index_ != meta_.current_mark_index_) {
249  // 2-a, easy.
250  ret.end_ = target.offset_end_;
251  } else {
252  // 2-b, now we have to populate target.offset_end_ ourselves.
253  VLOG(0) << "This guy seems sleeping for a while.." << *this;
254  // We can safely populate target.offset_end_, but be careful on reading offset_committed_.
255  // If the thread now resumes working, it adds a new mark *and then* increments
256  // offset_committed_. So, it's safe as far as we take appropriate fences.
257 
258  // well, a bit redundant fences, but this part is not executed so often. no issue.
260  uint64_t committed_copy = meta_.offset_committed_;
262  bool still_current = meta_.oldest_mark_index_ == meta_.current_mark_index_;
264  if (still_current) {
265  target.offset_end_ = committed_copy;
266  VLOG(0) << "Okay, the logger populated the offset_end on behalf." << *this;
267  } else {
268  // This is super-rare.
269  LOG(INFO) << "Interesting. The thread has now awaken and added a new mark. offset_end"
270  << " should be now populated. "<< *this;
271  }
272 
274  ret.end_ = target.offset_end_;
275  }
276  } else {
277  // Case 3) First, consume stale marks as much as possible
278  // (note, in this case "target.offset_begin_ == meta_.offset_durable_" might not hold
279  ASSERT_ND(target.new_epoch_ < written_epoch);
280  if (meta_.oldest_mark_index_ != meta_.current_mark_index_) {
281  VLOG(0) << "Advancing oldest mark index. before=" << *this;
282  while (meta_.oldest_mark_index_ != meta_.current_mark_index_
283  && meta_.thread_epoch_marks_[meta_.oldest_mark_index_].new_epoch_ < written_epoch) {
284  meta_.oldest_mark_index_
288  LOG(FATAL) << "meta_.oldest_mark_index_ out of range. we must have waited.";
289  }
290  }
291  VLOG(0) << "Advanced oldest mark index. after=" << *this;
292  // Then re-evaluate. It might be now Case 1 or 2. recursion, but essentially a retry.
293  ret = get_logs_to_write(written_epoch);
294  } else {
295  VLOG(1) << "Heh, this guy is still sleeping.." << *this;
296  ret.begin_ = 0;
297  ret.end_ = 0;
298  }
299  }
300  return ret;
301 }
302 
304  // See ThreadLogBufferMeta's class comment about tricky cases (the thread being idle).
305  // assert_consistent(); this verification assumes the worker is not working. we can't use it here
306 
308  if (target.new_epoch_ > written_epoch) {
309  // Case 1) no thing to do.
310  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
311  } else if (target.new_epoch_ == written_epoch) {
312  // Case 2) Is it 2-a? or 2-b?
313  ASSERT_ND(target.offset_begin_ == meta_.offset_durable_);
314  meta_.offset_durable_ = target.offset_end_;
315  if (meta_.oldest_mark_index_ != meta_.current_mark_index_) {
316  // 2-a, go to next mark.
317  meta_.oldest_mark_index_
319  } else {
320  // 2-b, remains here. Will be case-3 next time.
321  }
322  } else {
323  // Case 3 (even after consuming stale marks). Do nothing.
324  ASSERT_ND(target.new_epoch_ < written_epoch);
325  }
326  DVLOG(0) << "Logger has written out all logs in epoch-" << written_epoch << ". " << *this;
327  // assert_consistent(); this verification assumes the worker is not working. we can't use it here
328 }
329 
330 std::ostream& operator<<(std::ostream& o, const ThreadLogBuffer& v) {
331  o << v.meta_;
332  return o;
333 }
334 
335 std::ostream& operator<<(std::ostream& o, const ThreadLogBufferMeta& v) {
336  o << "<ThreadLogBuffer>";
337  o << "<thread_id_>" << v.thread_id_ << "</thread_id_>";
338  o << "<buffer_size_>" << v.buffer_size_ << "</buffer_size_>";
339  o << "<offset_head_>" << v.offset_head_ << "</offset_head_>";
340  o << "<offset_durable_>" << v.offset_durable_ << "</offset_durable_>";
341  o << "<offset_committed_>" << v.offset_committed_ << "</offset_committed_>";
342  o << "<offset_tail_>" << v.offset_tail_ << "</offset_tail_>";
343  o << "<thread_epoch_marks"
344  << " oldest=\"" << v.oldest_mark_index_ << "\""
345  << " current=\"" << v.current_mark_index_ << "\""
346  << ">";
347 
348  // from oldest (inclusive) to current (exclusive).
349  for (uint32_t i = v.oldest_mark_index_;
350  i != v.current_mark_index_;
352  o << "<Entry index=\"" << i << "\">" << v.thread_epoch_marks_[i] << "</Entry>";
353  }
354  // also write out current entry
355  o << "<Entry index=\"" << v.current_mark_index_ << "\">"
357  << "</Entry>";
358  o << "</thread_epoch_marks>";
359  o << "</ThreadLogBuffer>";
360  return o;
361 }
362 
363 std::ostream& operator<<(std::ostream& o, const ThreadEpockMark& v) {
364  o << "<ThreadEpockMark "
365  << " old=\"" << v.old_epoch_ << "\""
366  << " new=\"" << v.new_epoch_ << "\""
367  << " offset_begin=\"" << assorted::Hex(v.offset_begin_) << "\""
368  << " offset_end=\"" << assorted::Hex(v.offset_end_) << "\""
369  << " />";
370  return o;
371 }
372 
373 } // namespace log
374 } // namespace foedus
uint64_t head_to_tail_distance() const __attribute__((always_inline))
static uint32_t increment_mark_index(uint32_t index)
uint32_t current_mark_index_
Array index in thread_epoch_marks_ that indicates the current epoch being appended by the thread...
void wakeup_loggers()
Wake up loggers if they are sleeping.
Definition: log_manager.cpp:35
NumaCoreMemory * get_core_memory(foedus::thread::ThreadId id) const
A thread-local log buffer.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
void wait_for_space(uint16_t required_space)
Called when we have to wait till offset_head_ advances so that we can put new logs.
uint64_t buffer_size_
Size of the buffer assigned to this thread.
Each thread can have at most this number of epochs in this log buffer.
uint64_t offset_end_
Where the new epoch ends.
uint64_t offset_committed_
This marks the position upto which transaction logs are committed by the thread.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Epoch old_epoch_
The value of new_epoch_ of the previous mark.
Metadata part of ThreadLogBuffer, without the actual buffer which is way way larger.
Represents a time epoch.
Definition: epoch.hpp:61
Epoch new_epoch_
The epoch of log entries this mark represents.
Declares common log types used in all packages.
const EngineOptions & get_options() const
Definition: engine.cpp:39
Repository of memories dynamically acquired within one CPU core (thread).
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
AlignedMemorySlice get_log_buffer_memory() const
ThreadEpockMark thread_epoch_marks_[kMaxNonDurableEpochs]
Circular array of epoch marks of a thread log buffer.
uint64_t offset_head_
This marks the position where log entries start.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
static void advance(uint64_t buffer_size, uint64_t *target, uint64_t amount) __attribute__((always_inline))
Addition operator, considering wrapping around.
OffsetRange get_logs_to_write(Epoch written_epoch)
Returns begin/end offsets of logs in the given epoch.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint64_t offset_begin_
Where the new epoch starts.
uint64_t offset_durable_
This marks the position upto which the log writer durably wrote out to log files. ...
A slice of foedus::memory::AlignedMemory.
A thread-buffer's epoch marker, which indicates where a thread switched an epoch. ...
void assert_consistent() const
Only for Debug-assertion.
std::ostream & operator<<(std::ostream &o, const LogHeader &v)
bool is_valid() const
Definition: epoch.hpp:96
ErrorStack initialize_once() override
NumaNodeMemory * get_local_memory() const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
uint64_t offset_tail_
The current cursor to which next log will be written.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
const ErrorStack kRetOk
Normal return value for no-error case.
void on_log_written(Epoch written_epoch)
Called when the logger wrote out all logs in the given epoch, advancing oldest_mark_index_.
void assert_consistent() const
Only for Debug-assertion.
Convenient way of writing hex integers to stream.
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
uint32_t oldest_mark_index_
Array index in thread_epoch_marks_ that indicates the oldest epoch mark to be consumed by the logger...
bool null_device_
[Experiments] as if we write out to /dev/null.
ErrorStack uninitialize_once() override
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
uint64_t buffer_size_safe_
buffer_size_ - 64.
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).