libfoedus-core
FOEDUS Core Library
logger_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <chrono>
24 #include <cstring>
25 #include <ostream>
26 #include <sstream>
27 #include <string>
28 #include <thread>
29 #include <vector>
30 
31 #include "foedus/assert_nd.hpp"
32 #include "foedus/engine.hpp"
34 #include "foedus/epoch.hpp"
39 #include "foedus/fs/filesystem.hpp"
42 #include "foedus/log/log_type.hpp"
49 #include "foedus/thread/thread.hpp"
53 #include "foedus/xct/xct.hpp"
55 
56 namespace foedus {
57 namespace log {
58 
59 inline bool is_log_aligned(uint64_t offset) {
60  return offset % FillerLogType::kLogWriteUnitSize == 0;
61 }
62 inline uint64_t align_log_ceil(uint64_t offset) {
63  return assorted::align< uint64_t, FillerLogType::kLogWriteUnitSize >(offset);
64 }
65 inline uint64_t align_log_floor(uint64_t offset) {
66  if (offset % FillerLogType::kLogWriteUnitSize == 0) {
67  return offset;
68  } else {
69  return assorted::align< uint64_t, FillerLogType::kLogWriteUnitSize >(offset)
71  }
72 }
73 
75  control_block_->initialize();
76  // clear all variables
77  current_file_ = nullptr;
78  LOG(INFO) << "Initializing Logger-" << id_ << ". assigned " << assigned_thread_ids_.size()
79  << " threads, starting from " << assigned_thread_ids_[0] << ", numa_node_="
80  << static_cast<int>(numa_node_);
81 
82  // Initialize the values from the latest savepoint.
84  // durable epoch from initial savepoint
85  control_block_->durable_epoch_
87  control_block_->marked_epoch_ = Epoch(control_block_->durable_epoch_);
88  control_block_->oldest_ordinal_ = info.oldest_log_file_; // ordinal/length too
89  control_block_->current_ordinal_ = info.current_log_file_;
90  control_block_->current_file_durable_offset_ = info.current_log_file_offset_durable_;
91  control_block_->oldest_file_offset_begin_ = info.oldest_log_file_offset_begin_;
92  current_file_path_ = engine_->get_options().log_.construct_suffixed_log_path(
93  numa_node_,
94  id_,
95  control_block_->current_ordinal_);
96  // open the log file
97  current_file_ = new fs::DirectIoFile(current_file_path_,
99  WRAP_ERROR_CODE(current_file_->open(true, true, true, true));
100  if (control_block_->current_file_durable_offset_ < current_file_->get_current_offset()) {
101  // there are non-durable regions as an incomplete remnant of previous execution.
102  // probably there was a crash. in this case, we discard the non-durable regions.
103  LOG(ERROR) << "Logger-" << id_ << "'s log file has a non-durable region. Probably there"
104  << " was a crash. Will truncate it to " << control_block_->current_file_durable_offset_
105  << " from " << current_file_->get_current_offset();
106  WRAP_ERROR_CODE(current_file_->truncate(
107  control_block_->current_file_durable_offset_,
108  true)); // sync right now
109  }
110  ASSERT_ND(control_block_->current_file_durable_offset_ == current_file_->get_current_offset());
111  LOG(INFO) << "Initialized logger: " << *this;
112 
113  // which threads are assigned to me?
114  for (auto thread_id : assigned_thread_ids_) {
115  assigned_threads_.push_back(
118  }
119 
120  // grab a buffer to pad incomplete blocks for direct file I/O
122  FillerLogType::kLogWriteUnitSize, &fill_buffer_));
123  ASSERT_ND(!fill_buffer_.is_null());
126  LOG(INFO) << "Logger-" << id_ << " grabbed a padding buffer. size=" << fill_buffer_.get_size();
127  CHECK_ERROR(write_dummy_epoch_mark());
128 
129  // log file and buffer prepared. let's launch the logger thread
130  logger_thread_ = std::move(std::thread(&Logger::handle_logger, this));
131 
132  assert_consistent();
133  return kRetOk;
134 }
135 
137  LOG(INFO) << "Uninitializing Logger-" << id_ << ": " << *this;
138  ErrorStackBatch batch;
139  if (logger_thread_.joinable()) {
140  {
141  control_block_->stop_requested_ = true;
142  control_block_->wakeup_cond_.signal();
143  }
144  logger_thread_.join();
145  }
146  if (current_file_) {
147  current_file_->close();
148  delete current_file_;
149  current_file_ = nullptr;
150  }
151  fill_buffer_.release_block();
152  control_block_->uninitialize();
153  return SUMMARIZE_ERROR_BATCH(batch);
154 }
155 
156 
157 void Logger::handle_logger() {
158  LOG(INFO) << "Logger-" << id_ << " started. pin on NUMA node-" << static_cast<int>(numa_node_);
160  // The actual logging can't start until XctManager is initialized.
163  }
164 
165  LOG(INFO) << "Logger-" << id_ << " now starts logging";
166  while (!is_stop_requested()) {
167  {
168  uint64_t demand = control_block_->wakeup_cond_.acquire_ticket();
169  if (!is_stop_requested()) {
170  control_block_->wakeup_cond_.timedwait(demand, 10000ULL);
171  }
172  }
173  const int kMaxIterations = 100;
174  int iterations = 0;
175 
176  while (!is_stop_requested()) {
177  assert_consistent();
178  Epoch current_epoch = engine_->get_xct_manager()->get_current_global_epoch();
179  Epoch durable_epoch = get_durable_epoch();
180  ASSERT_ND(durable_epoch < current_epoch);
181  Epoch next_durable = durable_epoch.one_more();
182  if (next_durable == current_epoch.one_less()) {
183  DVLOG(2) << "Logger-" << id_ << " is well catching up. will sleep.";
184  break;
185  }
186 
187  // just for debug out
188  debugging::StopWatch watch;
189  uint64_t before_offset = (current_file_ ? current_file_-> get_current_offset() : 0);
190 
191  COERCE_ERROR(write_one_epoch(next_durable));
192  ASSERT_ND(get_durable_epoch() == next_durable);
193  COERCE_ERROR(switch_file_if_required());
194 
195  watch.stop();
196  uint64_t after_offset = (current_file_ ? current_file_-> get_current_offset() : 0);
197  // LOG(INFO) was too noisy
198  if (after_offset != before_offset) {
199  VLOG(0) << "Logger-" << id_ << " wrote out " << (after_offset - before_offset)
200  << " bytes for epoch-" << next_durable << " in " << watch.elapsed_ms() << " ms";
201  }
202 
203  if (((++iterations) % kMaxIterations) == 0) {
204  LOG(WARNING) << "Logger-" << id_ << " has been working without sleep for long time"
205  << "(" << iterations << "). Either too few loggers or potentially a bug?? "
206  << *this;
207  } else {
208  VLOG(0) << "Logger-" << id_ << " has more task. keep working. " << iterations;
209  DVLOG(1) << *this;
210  }
211  }
212  }
213  LOG(INFO) << "Logger-" << id_ << " ended. " << *this;
214 }
215 
216 ErrorStack Logger::update_durable_epoch(Epoch new_durable_epoch, bool had_any_log) {
217  DVLOG(1) << "Checked all loggers. new_durable_epoch=" << new_durable_epoch;
218  if (had_any_log) {
219  VLOG(0) << "Logger-" << id_ << " updating durable_epoch_ from " << get_durable_epoch()
220  << " to " << new_durable_epoch;
221 
222  // BEFORE updating the epoch, fsync the file AND the parent folder
223  if (!fs::fsync(current_file_path_, true)) {
225  }
226  control_block_->current_file_durable_offset_ = current_file_->get_current_offset();
227  VLOG(0) << "Logger-" << id_ << " fsynced the current file ("
228  << control_block_->current_file_durable_offset_ << " bytes so far) and its folder";
229  DVLOG(0) << "Before: " << *this;
230  assorted::memory_fence_release(); // announce it only AFTER above
231  } else {
232  VLOG(0) << "Logger-" << id_ << " had no log in this epoch. not writing an epoch mark."
233  << " durable ep=" << get_durable_epoch() << ", new_epoch=" << new_durable_epoch
234  << " marked ep=" << control_block_->marked_epoch_;
235  ASSERT_ND(control_block_->current_file_durable_offset_ == current_file_->get_current_offset());
236  }
237 
238  ASSERT_ND(new_durable_epoch >= Epoch(control_block_->durable_epoch_));
239  if (new_durable_epoch > Epoch(control_block_->durable_epoch_)) {
240  // This must be the only place to set durable_epoch_
241  control_block_->durable_epoch_ = new_durable_epoch.value();
242 
243  // finally, let the log manager re-calculate the global durable epoch.
244  // this may or may not result in new global durable epoch
247 
248  if (had_any_log) {
249  DVLOG(0) << "After: " << *this;
250  }
251  }
252 
253  assert_consistent();
254  return kRetOk;
255 }
256 ErrorStack Logger::write_dummy_epoch_mark() {
257  CHECK_ERROR(log_epoch_switch(get_durable_epoch()));
258  LOG(INFO) << "Logger-" << id_ << " wrote out a dummy epoch marker at the beginning";
259  CHECK_ERROR(update_durable_epoch(get_durable_epoch(), true)); // flush the epoch mark immediately
260  return kRetOk;
261 }
262 
263 ErrorStack Logger::log_epoch_switch(Epoch new_epoch) {
264  ASSERT_ND(control_block_->marked_epoch_ <= new_epoch);
265  VLOG(0) << "Writing epoch marker for Logger-" << id_
266  << ". marked_epoch_=" << control_block_->marked_epoch_ << " new_epoch=" << new_epoch;
267  DVLOG(1) << *this;
268 
269  // Use fill buffer to write out the epoch mark log
270  std::lock_guard<std::mutex> guard(epoch_switch_mutex_);
271  char* buf = reinterpret_cast<char*>(fill_buffer_.get_block());
272  EpochMarkerLogType* epoch_marker = reinterpret_cast<EpochMarkerLogType*>(buf);
273  epoch_marker->populate(
274  control_block_->marked_epoch_,
275  new_epoch,
276  numa_node_,
278  id_,
279  control_block_->current_ordinal_,
280  current_file_->get_current_offset());
281 
282  // Fill it up to 4kb and write. A bit wasteful, but happens only once per epoch
283  FillerLogType* filler_log = reinterpret_cast<FillerLogType*>(buf
284  + sizeof(EpochMarkerLogType));
285  filler_log->populate(fill_buffer_.get_size() - sizeof(EpochMarkerLogType));
286 
287  WRAP_ERROR_CODE(current_file_->write(fill_buffer_.get_size(), fill_buffer_));
288  control_block_->marked_epoch_ = new_epoch;
289  add_epoch_history(*epoch_marker);
290 
291  assert_consistent();
292  return kRetOk;
293 }
294 
295 ErrorStack Logger::switch_file_if_required() {
296  ASSERT_ND(current_file_);
297  if (current_file_->get_current_offset()
298  < (static_cast<uint64_t>(engine_->get_options().log_.log_file_size_mb_) << 20)) {
299  return kRetOk;
300  }
301 
302  LOG(INFO) << "Logger-" << id_ << " moving on to next file. " << *this;
303 
304  // Close the current one. Immediately call fsync on it AND the parent folder.
305  current_file_->close();
306  delete current_file_;
307  current_file_ = nullptr;
308  control_block_->current_file_durable_offset_ = 0;
309  if (!fs::fsync(current_file_path_, true)) {
311  }
312 
313  current_file_path_ = engine_->get_options().log_.construct_suffixed_log_path(
314  numa_node_,
315  id_,
316  ++control_block_->current_ordinal_);
317  LOG(INFO) << "Logger-" << id_ << " next file=" << current_file_path_;
318  current_file_ = new fs::DirectIoFile(current_file_path_,
320  WRAP_ERROR_CODE(current_file_->open(true, true, true, true));
321  ASSERT_ND(current_file_->get_current_offset() == 0);
322  LOG(INFO) << "Logger-" << id_ << " moved on to next file. " << *this;
323  CHECK_ERROR(write_dummy_epoch_mark());
324  return kRetOk;
325 }
326 
327 ErrorStack Logger::write_one_epoch(Epoch write_epoch) {
328  ASSERT_ND(get_durable_epoch().one_more() == write_epoch);
329  ASSERT_ND(write_epoch.one_more() < engine_->get_xct_manager()->get_current_global_epoch());
330  bool had_any_log = false;
331  for (thread::Thread* the_thread : assigned_threads_) {
332  ThreadLogBuffer& buffer = the_thread->get_thread_log_buffer();
333  ThreadLogBuffer::OffsetRange range = buffer.get_logs_to_write(write_epoch);
334  ASSERT_ND(range.begin_ <= buffer.get_meta().buffer_size_);
335  ASSERT_ND(range.end_ <= buffer.get_meta().buffer_size_);
336  if (range.begin_ > buffer.get_meta().buffer_size_
337  || range.end_ > buffer.get_meta().buffer_size_) {
338  LOG(FATAL) << "Logger-" << id_ << " reported an invalid buffer range for epoch-"
339  << write_epoch << ". begin=" << range.begin_ << ", end=" << range.end_
340  << " while log buffer size=" << buffer.get_meta().buffer_size_
341  << ". " << *this;
342  }
343 
344  if (!range.is_empty()) {
345  if (had_any_log == false) {
346  // First log for this epoch. Now we write out an epoch mark.
347  // If no buffers have any logs, we don't even bother writing out an epoch mark.
348  VLOG(1) << "Logger-" << id_ << " has a non-empty epoch-" << write_epoch;
349  had_any_log = true;
350  CHECK_ERROR(log_epoch_switch(write_epoch));
351  }
352 
353  if (range.begin_ < range.end_) {
354  CHECK_ERROR(write_one_epoch_piece(buffer, write_epoch, range.begin_, range.end_));
355  } else {
356  // oh, it wraps around.
357  // let's write up to the end of the circular buffer, then from the beginning.
358  // we can simply write out logs upto the end without worrying about the case where a log
359  // entry spans the end of circular buffer. Because we avoid that in ThreadLogBuffer.
360  // (see reserve_new_log()). So, we can separately handle the two writes by calling itself
361  // again, which adds padding if they need.
362  VLOG(0) << "Wraps around. from_offset=" << range.begin_ << ", upto_offset=" << range.end_;
363  uint64_t capacity = buffer.get_meta().buffer_size_;
364  CHECK_ERROR(write_one_epoch_piece(buffer, write_epoch, range.begin_, capacity));
365  CHECK_ERROR(write_one_epoch_piece(buffer, write_epoch, 0, range.end_));
366  }
367  }
368  buffer.on_log_written(write_epoch);
369  }
370  CHECK_ERROR(update_durable_epoch(write_epoch, had_any_log));
371  return kRetOk;
372 }
373 
374 ErrorStack Logger::write_one_epoch_piece(
375  const ThreadLogBuffer& buffer,
376  Epoch write_epoch,
377  uint64_t from_offset,
378  uint64_t upto_offset) {
379  assert_consistent();
380  ASSERT_ND(from_offset <= upto_offset);
381  if (from_offset == upto_offset) {
382  return kRetOk;
383  }
384 
385  VLOG(0) << "Writing out Thread-" << buffer.get_thread_id() << "'s log. from_offset="
386  << from_offset << ", upto_offset=" << upto_offset << ", write_epoch=" << write_epoch;
387  DVLOG(1) << *this;
388 
389  const char* raw_buffer = buffer.get_buffer();
390  assert_written_logs(write_epoch, raw_buffer + from_offset, upto_offset - from_offset);
391 
392  // 1) First-4kb. Do we have to pad at the beginning?
393  if (!is_log_aligned(from_offset)) {
394  VLOG(1) << "padding at beginning needed. ";
395  char* buf = reinterpret_cast<char*>(fill_buffer_.get_block());
396 
397  // pad upto from_offset
398  uint64_t begin_fill_size = from_offset - align_log_floor(from_offset);
399  ASSERT_ND(begin_fill_size < FillerLogType::kLogWriteUnitSize);
400  FillerLogType* begin_filler_log = reinterpret_cast<FillerLogType*>(buf);
401  begin_filler_log->populate(begin_fill_size);
402  buf += begin_fill_size;
403 
404  // then copy the log content, upto at most one page... is it one page? or less?
405  uint64_t copy_size;
406  if (upto_offset <= align_log_ceil(from_offset)) {
407  VLOG(1) << "whole log in less than one page.";
408  copy_size = upto_offset - from_offset;
409  } else {
410  VLOG(1) << "one page or more.";
411  copy_size = align_log_ceil(from_offset) - from_offset;
412  }
414  std::memcpy(buf, raw_buffer + from_offset, copy_size);
415  buf += copy_size;
416 
417  // pad at the end, if needed
418  uint64_t end_fill_size = FillerLogType::kLogWriteUnitSize - (begin_fill_size + copy_size);
420  // logs are all 8-byte aligned.
421  // note that FillerLogType (16 bytes) is fully populated. We use only the first 8 bytes of it.
422  ASSERT_ND(end_fill_size % 8 == 0);
423  if (end_fill_size > 0) {
424  FillerLogType* end_filler_log = reinterpret_cast<FillerLogType*>(buf);
425  end_filler_log->populate(end_fill_size);
426  }
427  WRAP_ERROR_CODE(current_file_->write(FillerLogType::kLogWriteUnitSize, fill_buffer_));
428  from_offset += copy_size;
429  }
430 
431  if (from_offset >= upto_offset) { // if it's larger, the entire logs were less than one page.
432  return kRetOk;
433  }
434 
435  // from here, "from" is assured to be aligned
436  ASSERT_ND(is_log_aligned(from_offset));
437 
438 
439  // 2) Middle regions where everything is aligned. easy
440  uint64_t middle_size = align_log_floor(upto_offset) - from_offset;
441  if (middle_size > 0) {
442  // debugging::StopWatch watch;
443  VLOG(1) << "Writing middle regions: " << middle_size << " bytes from " << from_offset;
444  WRAP_ERROR_CODE(current_file_->write_raw(middle_size, raw_buffer + from_offset));
445  // watch.stop();
446  // mm, in fact too noisy... Maybe VLOG(0). but we need this information for the paper
447  // LOG(INFO) << "Wrote middle regions of " << middle_size << " bytes in "
448  // << watch.elapsed_ms() << "ms";
449  from_offset += middle_size;
450  }
451 
452  ASSERT_ND(is_log_aligned(from_offset));
453  ASSERT_ND(from_offset <= upto_offset);
454  if (from_offset == upto_offset) {
455  return kRetOk; // if upto_offset is luckily aligned, we exit here.
456  }
457 
458  ASSERT_ND(from_offset < upto_offset);
459  ASSERT_ND(!is_log_aligned(upto_offset));
460 
461  // 3) the last 4kb
462  VLOG(1) << "padding at end needed.";
463  char* buf = reinterpret_cast<char*>(fill_buffer_.get_block());
464 
465  uint64_t copy_size = upto_offset - from_offset;
467  std::memcpy(buf, raw_buffer + from_offset, copy_size);
468  buf += copy_size;
469 
470  // pad upto from_offset
471  const uint64_t fill_size = FillerLogType::kLogWriteUnitSize - copy_size;
472  FillerLogType* filler_log = reinterpret_cast<FillerLogType*>(buf);
473  filler_log->populate(fill_size);
474 
475  WRAP_ERROR_CODE(current_file_->write(FillerLogType::kLogWriteUnitSize, fill_buffer_));
476  return kRetOk;
477 }
478 
479 void Logger::assert_written_logs(Epoch write_epoch, const char* logs, uint64_t bytes) const {
480  ASSERT_ND(write_epoch.is_valid());
481  ASSERT_ND(logs);
482  ASSERT_ND(bytes);
483 #ifndef NDEBUG
484  // all logs should be in this epoch. we can do this kind of sanity check thanks to epoch marker.
486  uint64_t cur;
488  uint32_t previous_ordinal = 0;
489  for (cur = 0; cur < bytes;) {
490  const LogHeader* header= reinterpret_cast<const LogHeader*>(logs + cur);
491  cur += header->log_length_;
492  ASSERT_ND(header->log_length_ > 0);
493  ASSERT_ND(header->get_type() != kLogCodeEpochMarker);
494  if (header->get_type() == log::kLogCodeFiller) {
495  DLOG(INFO) << "Found a filler log in assert_written_logs: size=" << header->log_length_;
496  continue;
497  }
498  // These are logs from individual threads, so must be non-meta, non-marker logs.
499  ASSERT_ND(header->get_kind() == log::kRecordLogs);
500  reinterpret_cast<const RecordLogType*>(header)->assert_valid_generic();
501  ASSERT_ND(header->storage_id_ <= largest_storage_id);
502  Epoch record_epoch = header->xct_id_.get_epoch();
503  ASSERT_ND(record_epoch.is_valid());
504  ASSERT_ND(write_epoch == record_epoch);
505  uint32_t record_ordinal = header->xct_id_.get_ordinal();
506  ASSERT_ND(previous_ordinal <= record_ordinal);
507  previous_ordinal = record_ordinal;
508  }
509  ASSERT_ND(cur == bytes);
510 #endif // NDEBUG
511 }
512 
513 void Logger::assert_consistent() {
514 #ifndef NDEBUG
515  ASSERT_ND(get_durable_epoch().is_valid());
518  ASSERT_ND(control_block_->marked_epoch_.is_valid());
519  ASSERT_ND(control_block_->marked_epoch_ <= get_durable_epoch().one_more());
520  ASSERT_ND(is_log_aligned(control_block_->oldest_file_offset_begin_));
521  ASSERT_ND(current_file_ == nullptr || is_log_aligned(current_file_->get_current_offset()));
522  ASSERT_ND(is_log_aligned(control_block_->current_file_durable_offset_));
523  ASSERT_ND(current_file_ == nullptr
524  || control_block_->current_file_durable_offset_ <= current_file_->get_current_offset());
525 #endif // NDEBUG
526 }
527 
528 std::string Logger::to_string() const {
529  std::stringstream stream;
530  stream << *this;
531  return stream.str();
532 }
533 std::ostream& operator<<(std::ostream& o, const Logger& v) {
534  o << "<Logger>"
535  << "<id_>" << v.id_ << "</id_>"
536  << "<numa_node_>" << static_cast<int>(v.numa_node_) << "</numa_node_>"
537  << "<in_node_ordinal_>" << static_cast<int>(v.in_node_ordinal_) << "</in_node_ordinal_>"
538  << "<log_folder_>" << v.log_folder_ << "</log_folder_>";
539  o << "<assigned_thread_ids_>";
540  for (auto thread_id : v.assigned_thread_ids_) {
541  o << "<thread_id>" << thread_id << "</thread_id>";
542  }
543  o << "</assigned_thread_ids_>";
544  o << "<durable_epoch_>" << v.get_durable_epoch() << "</durable_epoch_>"
545  << "<marked_epoch_>" << v.control_block_->marked_epoch_ << "</marked_epoch_>"
546  << "<oldest_ordinal_>" << v.control_block_->oldest_ordinal_ << "</oldest_ordinal_>"
547  << "<oldest_file_offset_begin_>" << v.control_block_->oldest_file_offset_begin_
548  << "</oldest_file_offset_begin_>"
549  << "<current_ordinal_>" << v.control_block_->current_ordinal_ << "</current_ordinal_>";
550 
551  o << "<current_file_>";
552  if (v.current_file_) {
553  o << *v.current_file_;
554  } else {
555  o << "nullptr";
556  }
557  o << "</current_file_>";
558 
559  o << "<current_file_path_>" << v.current_file_path_ << "</current_file_path_>";
560 
561  o << "<current_file_length_>";
562  if (v.current_file_) {
563  o << v.current_file_->get_current_offset();
564  } else {
565  o << "nullptr";
566  }
567  o << "</current_file_length_>";
568 
569  o << "<epoch_history_head>"
570  << v.control_block_->epoch_history_head_ << "</epoch_history_head>";
571  o << "<epoch_history_count>"
572  << v.control_block_->epoch_history_count_ << "</epoch_history_count>";
573  o << "</Logger>";
574  return o;
575 }
576 
577 } // namespace log
578 } // namespace foedus
std::atomic< LogFileOrdinal > current_ordinal_
Ordinal of the log file this logger is currently appending to.
std::atomic< uint64_t > oldest_file_offset_begin_
Inclusive beginning of active region in the oldest log file.
ThreadPoolPimpl * get_pimpl() const
Returns the pimpl of this object.
ErrorCode truncate(uint64_t new_length, bool sync=false)
Discard the content of the file after the given offset.
ErrorStack allocate_numa_memory(uint64_t size, AlignedMemory *out) const
ErrorStack initialize_once() override
Definition: logger_impl.cpp:74
Epoch marked_epoch_
Upto what epoch the logger has put epoch marker in the log file.
std::string construct_suffixed_log_path(int node, int logger, LogFileOrdinal ordinal) const
construct full path of individual log file (log_folder/LOGGERID_ORDINAL.log)
Definition: log_options.cpp:41
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
uint64_t get_alignment() const
Returns the alignment of the memory block.
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
void release_block()
Releases the memory block.
ErrorCode write(uint64_t desired_bytes, const foedus::memory::AlignedMemory &buffer)
Sequentially write the given amount of contents from the current position.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
ThreadGroup * get_local_group() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
uint64_t align_log_floor(uint64_t offset)
Definition: logger_impl.cpp:65
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Definition: xct_manager.cpp:31
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:139
bool close()
Close the file if not yet closed.
record targetted logs
Definition: log_type.hpp:103
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
std::atomic< LogFileOrdinal > oldest_ordinal_
Ordinal of the oldest active log file of this logger.
Represents a time epoch.
Definition: epoch.hpp:61
ErrorCode open(bool read, bool write, bool append, bool create)
Tries to open the file for the specified volume.
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
uint64_t get_current_offset() const
Pin the current thread to the given NUMA node in this object's scope.
Declares common log types used in all packages.
ErrorStack refresh_global_durable_epoch()
Called whenever there is a chance that the global durable epoch advances.
Definition: log_manager.cpp:54
bool is_stop_requested() const
const EngineOptions & get_options() const
Definition: engine.cpp:39
#define COERCE_ERROR(x)
This macro calls x and aborts if encounters an error.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
Batches zero or more ErrorStack objects to represent in one ErrorStack.
LoggerControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
A log writer that writes out buffered logs to stable storages.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
Thread * get_thread(ThreadLocalOrdinal ordinal) const
Returns Thread object for the given ordinal in this group.
Epoch get_durable_epoch() const
Returns this logger's durable epoch.
Definition: logger_ref.cpp:41
void * get_block() const
Returns the memory block.
0x020C : "FILESYS: fsync() failed." .
Definition: error_code.hpp:137
ErrorCode write_raw(uint64_t desired_bytes, const void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
uint32_t epoch_history_head_
index of the oldest history in epoch_histories_
std::ostream & operator<<(std::ostream &o, const LogHeader &v)
uint64_t get_size() const
Returns the byte size of the memory block.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
StorageId get_largest_storage_id()
Returns the largest StorageId that does or did exist.
ErrorStack uninitialize_once() override
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
NumaNodeMemory * get_local_memory() const
Represents an I/O stream on one file without filesystem caching.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
const ErrorStack kRetOk
Normal return value for no-error case.
std::string to_string() const
0x3001 : foedus::log::FillerLogType .
Definition: log_type.hpp:111
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
uint64_t align_log_ceil(uint64_t offset)
Definition: logger_impl.cpp:62
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
bool is_log_aligned(uint64_t offset)
Definition: logger_impl.cpp:59
void add_epoch_history(const EpochMarkerLogType &epoch_marker)
Append a new epoch history.
Definition: logger_ref.cpp:65
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Information in savepoint for one logger.
Definition: savepoint.hpp:140
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
Definition: filesystem.cpp:203
uint32_t log_file_size_mb_
Size in MB of each file loggers write out.
Definition: log_options.hpp:92
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
Definition: engine.cpp:52
We always write to file in a multiply of this value, filling up the rest if needed.
uint32_t epoch_history_count_
number of active entries in epoch_histories_ .
LoggerSavepointInfo get_logger_savepoint(log::LoggerId logger_id)
Returns the saved information of the given logger in latest savepoint.
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
0x3002 : foedus::log::EpochMarkerLogType .
Definition: log_type.hpp:112
bool is_null() const
Returns if this object doesn't hold a valid memory block.