libfoedus-core
FOEDUS Core Library
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
meta_logger_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 #include <ostream>
25 
26 #include "foedus/assert_nd.hpp"
32 
33 namespace foedus {
34 namespace log {
35 
37  ASSERT_ND(engine_->is_master());
38  control_block_ = engine_->get_soc_manager()->get_shared_memory_repo()
40  control_block_->initialize();
41  std::memset(control_block_->buffer_, 0, sizeof(control_block_->buffer_));
42 
44  if (!fs::exists(path.parent_path())) {
45  fs::create_directories(path.parent_path());
46  }
47 
49  &control_block_->oldest_offset_,
50  &control_block_->durable_offset_);
51 
52  // Open log file
53  current_file_ = new fs::DirectIoFile(path, engine_->get_options().log_.emulation_);
54  WRAP_ERROR_CODE(current_file_->open(true, true, true, true));
55  CHECK_ERROR(truncate_non_durable(engine_->get_savepoint_manager()->get_saved_durable_epoch()));
56  ASSERT_ND(control_block_->durable_offset_ == current_file_->get_current_offset());
57 
58  stop_requested_ = false;
59  logger_thread_ = std::move(std::thread(&MetaLogger::meta_logger_main, this));
60  return kRetOk;
61 }
62 
63 
65  const log::LogHeader* entry,
66  Epoch durable_epoch,
67  uint64_t offset) {
68  std::stringstream ss;
69  ss << " <Log offset=\"" << assorted::Hex(offset) << "\""
70  << " len=\"" << assorted::Hex(entry->log_length_) << "\""
71  << " type=\"" << assorted::Hex(entry->log_type_code_) << "\""
72  << " storage_id=\"" << assorted::Hex(entry->storage_id_) << "\"";
73  if (entry->log_length_ >= 8U) {
74  ss << " xct_id_epoch=\"" << entry->xct_id_.get_epoch_int() << "\"";
75  ss << " xct_id_ordinal=\"" << entry->xct_id_.get_ordinal() << "\"";
76  }
77  ss << ">";
78  log::invoke_ostream(entry, &ss);
79  ss << "</Log>";
80  LOG(WARNING) << "Found a meta log that is not in durable epoch (" << durable_epoch
81  << "). Probably the last run didn't invoke wait_for_commit(). The operation is discarded."
82  << " Log content:" << std::endl << ss.str();
83 }
84 
85 ErrorStack MetaLogger::truncate_non_durable(Epoch saved_durable_epoch) {
86  ASSERT_ND(saved_durable_epoch.is_valid());
87  const uint64_t from_offset = control_block_->oldest_offset_;
88  const uint64_t to_offset = control_block_->durable_offset_;
89  ASSERT_ND(from_offset <= to_offset);
90  LOG(INFO) << "Truncating non-durable meta logs, if any. Right now meta logger's"
91  << " oldest_offset_=" << from_offset
92  << ", (meta logger's local) durable_offset_=" << to_offset
93  << ", global saved_durable_epoch=" << saved_durable_epoch;
94  ASSERT_ND(current_file_->is_opened());
95 
96  // Currently, we need to read everything from oldest_offset_ to see from where
97  // we have non-durable logs, whose epochs are larger than the _globally_ durable epoch.
98  // TASK(Hideaki) We should change SavepointManager to emit globally_durable_offset_. later.
99  const uint64_t read_size = to_offset - from_offset;
100  if (read_size > 0) {
101  memory::AlignedMemory buffer;
102  buffer.alloc(read_size, 1U << 12, memory::AlignedMemory::kNumaAllocOnnode, 0);
103  WRAP_ERROR_CODE(current_file_->seek(from_offset, fs::DirectIoFile::kDirectIoSeekSet));
104  WRAP_ERROR_CODE(current_file_->read_raw(read_size, buffer.get_block()));
105 
106  char* buf = reinterpret_cast<char*>(buffer.get_block());
107  uint64_t cur = 0;
108  uint64_t first_non_durable_at = read_size;
109  while (cur < read_size) {
110  log::BaseLogType* entry = reinterpret_cast<log::BaseLogType*>(buf + cur);
112  const uint32_t log_length = entry->header_.log_length_;
113  log::LogCode type = entry->header_.get_type();
115  if (type == log::kLogCodeFiller || type == log::kLogCodeEpochMarker) {
116  // Skip filler/marker. These don't have XID
117  } else {
118  Epoch epoch = entry->header_.xct_id_.get_epoch();
119  if (epoch <= saved_durable_epoch) {
120  // Mostly this case.
121  } else {
122  // Ok, found a non-durable entry!
123  const uint64_t raw_offset = from_offset + cur;
124  on_non_durable_meta_log_found(&entry->header_, saved_durable_epoch, raw_offset);
125  ASSERT_ND(first_non_durable_at == read_size || first_non_durable_at < cur);
126  first_non_durable_at = std::min(first_non_durable_at, cur);
127  // We can break here, but let's read all and warn all of them. meta log should be tiny
128  }
129  }
130  cur += log_length;
131  }
132 
133  if (first_non_durable_at < read_size) {
134  // NOTE: This happens. Although the meta logger itself immediately flushes all logs
135  // to durable storages, the global durable_epoch is min(all_logger_durable_epoch).
136  // Thus, when the user didn't invoke wait_on_commit, we might have to discard
137  // some meta logs that are "durable by itself" but "non-durable regarding the whole database"
138  LOG(WARNING) << "Found some meta logs that are not in durable epoch (" << saved_durable_epoch
139  << "). We will truncate non-durable regions. new durable_offset=" << first_non_durable_at;
140  control_block_->durable_offset_ = first_non_durable_at;
141  engine_->get_savepoint_manager()->change_meta_logger_durable_offset(first_non_durable_at);
142  }
143  } else {
144  if (control_block_->durable_offset_ < current_file_->get_current_offset()) {
145  // Even if all locally-durable regions are globally durable,
146  // there still could be locally-non-durable regions (=not yet fsynced).
147  // Will truncate such regions.
148  LOG(ERROR) << "Meta log file has a non-durable region. Probably there"
149  << " was a crash. Will truncate";
150  }
151  }
152 
153  const uint64_t new_offset = control_block_->durable_offset_;
154  if (new_offset < current_file_->get_current_offset()) {
155  LOG(WARNING) << "Truncating meta log file to " << new_offset
156  << " from " << current_file_->get_current_offset();
157  WRAP_ERROR_CODE(current_file_->truncate(new_offset, true));
158  }
159  WRAP_ERROR_CODE(current_file_->seek(new_offset, fs::DirectIoFile::kDirectIoSeekSet));
160  return kRetOk;
161 }
162 
163 
165  ASSERT_ND(engine_->is_master());
166  if (logger_thread_.joinable()) {
167  {
168  stop_requested_ = true;
169  control_block_->logger_wakeup_.signal();
170  }
171  logger_thread_.join();
172  }
173  control_block_->uninitialize();
174  if (current_file_) {
175  current_file_->close();
176  delete current_file_;
177  current_file_ = nullptr;
178  }
179  return kRetOk;
180 }
181 
182 void MetaLogger::meta_logger_main() {
183  LOG(INFO) << "Meta-logger started";
184  while (!stop_requested_) {
185  {
186  uint64_t demand = control_block_->logger_wakeup_.acquire_ticket();
187  if (!stop_requested_ && !control_block_->has_waiting_log()) {
188  VLOG(0) << "Meta-logger going to sleep";
189  control_block_->logger_wakeup_.timedwait(demand, 100000ULL);
190  }
191  }
192  VLOG(0) << "Meta-logger woke up";
193  if (stop_requested_) {
194  break;
195  } else if (!control_block_->has_waiting_log()) {
196  continue;
197  }
198 
199  // we observed buffer_used_ > 0 BEFORE the fence. Now we can read buffer_ safely.
201  ASSERT_ND(control_block_->buffer_used_ > 0);
202  uint32_t write_size = sizeof(control_block_->buffer_);
203  ASSERT_ND(control_block_->buffer_used_ <= write_size);
204  LOG(INFO) << "Meta-logger got a log (" << control_block_->buffer_used_ << " b) to write out";
205  FillerLogType* filler = reinterpret_cast<FillerLogType*>(
206  control_block_->buffer_ + control_block_->buffer_used_);
207  filler->populate(write_size - control_block_->buffer_used_);
208  ErrorCode er = current_file_->write_raw(write_size, control_block_->buffer_);
209  if (er != kErrorCodeOk) {
210  LOG(FATAL) << "Meta-logger couldn't write a log. error=" << get_error_message(er)
211  << ", os_error=" << assorted::os_error();
212  }
213 
214  // also fsync on the file and parent. every, single, time.
215  // we don't care performance on metadata logger. just make it simple and robust.
216  bool synced = fs::fsync(current_file_->get_path(), true);
217  if (!synced) {
218  LOG(FATAL) << "Meta-logger couldn't fsync. os_error=" << assorted::os_error();
219  }
221  control_block_->buffer_used_ = 0;
223  control_block_->durable_offset_ += write_size;
224  }
225  LOG(INFO) << "Meta-logger terminated";
226 }
227 
228 std::ostream& operator<<(std::ostream& o, const MetaLogger& v) {
229  o << "<MetaLogger>"
230  << "<current_file_>";
231  if (v.current_file_) {
232  o << *v.current_file_;
233  } else {
234  o << "nullptr";
235  }
236  o << "</current_file_>"
237  << "</MetaLogger>";
238  return o;
239 }
240 
241 } // namespace log
242 } // namespace foedus
LogCodeKind get_kind() const __attribute__((always_inline))
Convenience method to get LogCodeKind.
LogCode
A unique identifier of all log types.
Definition: log_type.hpp:87
ErrorCode truncate(uint64_t new_length, bool sync=false)
Discard the content of the file after the given offset.
numa_alloc_onnode() and numa_free().
soc::SharedPolling logger_wakeup_
the logger sleeps on this variable
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
GlobalMemoryAnchors * get_global_memory_anchors()
Epoch::EpochInteger get_epoch_int() const __attribute__((always_inline))
Definition: xct_id.hpp:966
char buffer_[1<< 12]
The content of current log buffer.
bool close()
Close the file if not yet closed.
record targetted logs
Definition: log_type.hpp:103
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Represents a time epoch.
Definition: epoch.hpp:61
uint32_t get_ordinal() const __attribute__((always_inline))
Definition: xct_id.hpp:976
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
ErrorCode open(bool read, bool write, bool append, bool create)
Tries to open the file for the specified volume.
uint64_t get_current_offset() const
log::MetaLogControlBlock * meta_logger_memory_
Tiny memory for metadata logger.
Declares common log types used in all packages.
void change_meta_logger_durable_offset(uint64_t durable_offset)
Rewrites meta logger's durable_offset.
void on_non_durable_meta_log_found(const log::LogHeader *entry, Epoch durable_epoch, uint64_t offset)
A log writer for metadata operation.
const EngineOptions & get_options() const
Definition: engine.cpp:39
ErrorCode read_raw(uint64_t desired_bytes, void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
Base class for log type.
uint64_t acquire_ticket() const
Gives the ticket to.
0 means no-error.
Definition: error_code.hpp:87
Analogue of boost::filesystem::path.
Definition: path.hpp:37
bool is_opened() const
Whether the file is already and successfully opened.
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
ErrorCode seek(uint64_t offset, SeekType seek_type)
Sets the position of the next byte to be written/extracted from/to the stream.
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
A common header part for all log types.
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
Definition: filesystem.cpp:89
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
uint16_t log_type_code_
Actually of LogCode defined in the X-Macro, but we want to make sure the type size is 2 bytes...
void * get_block() const
Returns the memory block.
uint16_t log_length_
Byte size of this log entry including this header itself and everything.
ErrorCode write_raw(uint64_t desired_bytes, const void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
uint64_t durable_offset_
Offset upto which log entries are fsynced.
std::ostream & operator<<(std::ostream &o, const LogHeader &v)
std::string construct_meta_log_path() const
metadata log file is placed in node-0/logger-0 folder
Definition: log_options.cpp:51
bool is_valid() const
Definition: epoch.hpp:96
std::string os_error()
Thread-safe strerror(errno).
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Represents an I/O stream on one file without filesystem caching.
ErrorStack initialize_once() override
Represents one memory block aligned to actual OS/hardware pages.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
const ErrorStack kRetOk
Normal return value for no-error case.
0 is reserved as a non-existing log type.
Definition: log_type.hpp:89
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
void invoke_ostream(const void *buffer, std::ostream *ptr)
Invokes the ostream operator for the given log type defined in log_type.xmacro.
const char * get_error_message(ErrorCode code)
Returns the error messages corresponding to ErrorCode enum defined in error_code.xmacro.
Definition: error_code.hpp:120
Convenient way of writing hex integers to stream.
storage::StorageId storage_id_
The storage this loggable operation mainly affects.
0x3001 : foedus::log::FillerLogType .
Definition: log_type.hpp:111
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
void get_meta_logger_offsets(uint64_t *oldest_offset, uint64_t *durable_offset) const
Returns the saved information of metadata logger in lateset savepoint.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorStack uninitialize_once() override
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
uint64_t oldest_offset_
Offset from which log entries are not gleaned yet.
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
Definition: filesystem.cpp:203
The offset is set to offset bytes.
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
void signal()
Signal it to let waiters exit.
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
0x3002 : foedus::log::EpochMarkerLogType .
Definition: log_type.hpp:112
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).