20 #include <glog/logging.h>
63 return assorted::align< uint64_t, FillerLogType::kLogWriteUnitSize >(offset);
69 return assorted::align< uint64_t, FillerLogType::kLogWriteUnitSize >(offset)
77 current_file_ =
nullptr;
78 LOG(INFO) <<
"Initializing Logger-" <<
id_ <<
". assigned " << assigned_thread_ids_.size()
79 <<
" threads, starting from " << assigned_thread_ids_[0] <<
", numa_node_="
90 control_block_->current_file_durable_offset_ = info.current_log_file_offset_durable_;
91 control_block_->oldest_file_offset_begin_ = info.oldest_log_file_offset_begin_;
103 LOG(ERROR) <<
"Logger-" <<
id_ <<
"'s log file has a non-durable region. Probably there"
104 <<
" was a crash. Will truncate it to " <<
control_block_->current_file_durable_offset_
111 LOG(INFO) <<
"Initialized logger: " << *
this;
114 for (
auto thread_id : assigned_thread_ids_) {
115 assigned_threads_.push_back(
126 LOG(INFO) <<
"Logger-" <<
id_ <<
" grabbed a padding buffer. size=" << fill_buffer_.
get_size();
130 logger_thread_ = std::move(std::thread(&Logger::handle_logger,
this));
137 LOG(INFO) <<
"Uninitializing Logger-" <<
id_ <<
": " << *
this;
139 if (logger_thread_.joinable()) {
144 logger_thread_.join();
147 current_file_->
close();
148 delete current_file_;
149 current_file_ =
nullptr;
157 void Logger::handle_logger() {
158 LOG(INFO) <<
"Logger-" <<
id_ <<
" started. pin on NUMA node-" <<
static_cast<int>(
numa_node_);
165 LOG(INFO) <<
"Logger-" <<
id_ <<
" now starts logging";
173 const int kMaxIterations = 100;
180 ASSERT_ND(durable_epoch < current_epoch);
181 Epoch next_durable = durable_epoch.one_more();
182 if (next_durable == current_epoch.one_less()) {
183 DVLOG(2) <<
"Logger-" <<
id_ <<
" is well catching up. will sleep.";
188 debugging::StopWatch watch;
189 uint64_t before_offset = (current_file_ ? current_file_-> get_current_offset() : 0);
196 uint64_t after_offset = (current_file_ ? current_file_-> get_current_offset() : 0);
198 if (after_offset != before_offset) {
199 VLOG(0) <<
"Logger-" <<
id_ <<
" wrote out " << (after_offset - before_offset)
200 <<
" bytes for epoch-" << next_durable <<
" in " << watch.elapsed_ms() <<
" ms";
203 if (((++iterations) % kMaxIterations) == 0) {
204 LOG(WARNING) <<
"Logger-" <<
id_ <<
" has been working without sleep for long time"
205 <<
"(" << iterations <<
"). Either too few loggers or potentially a bug?? "
208 VLOG(0) <<
"Logger-" <<
id_ <<
" has more task. keep working. " << iterations;
213 LOG(INFO) <<
"Logger-" <<
id_ <<
" ended. " << *
this;
216 ErrorStack Logger::update_durable_epoch(Epoch new_durable_epoch,
bool had_any_log) {
217 DVLOG(1) <<
"Checked all loggers. new_durable_epoch=" << new_durable_epoch;
220 <<
" to " << new_durable_epoch;
223 if (!
fs::fsync(current_file_path_,
true)) {
227 VLOG(0) <<
"Logger-" <<
id_ <<
" fsynced the current file ("
228 <<
control_block_->current_file_durable_offset_ <<
" bytes so far) and its folder";
229 DVLOG(0) <<
"Before: " << *
this;
232 VLOG(0) <<
"Logger-" <<
id_ <<
" had no log in this epoch. not writing an epoch mark."
249 DVLOG(0) <<
"After: " << *
this;
256 ErrorStack Logger::write_dummy_epoch_mark() {
258 LOG(INFO) <<
"Logger-" <<
id_ <<
" wrote out a dummy epoch marker at the beginning";
263 ErrorStack Logger::log_epoch_switch(Epoch new_epoch) {
265 VLOG(0) <<
"Writing epoch marker for Logger-" <<
id_
266 <<
". marked_epoch_=" <<
control_block_->marked_epoch_ <<
" new_epoch=" << new_epoch;
270 std::lock_guard<std::mutex> guard(epoch_switch_mutex_);
271 char* buf =
reinterpret_cast<char*
>(fill_buffer_.
get_block());
272 EpochMarkerLogType* epoch_marker =
reinterpret_cast<EpochMarkerLogType*
>(buf);
273 epoch_marker->populate(
283 FillerLogType* filler_log =
reinterpret_cast<FillerLogType*
>(buf
284 +
sizeof(EpochMarkerLogType));
285 filler_log->populate(fill_buffer_.
get_size() -
sizeof(EpochMarkerLogType));
295 ErrorStack Logger::switch_file_if_required() {
302 LOG(INFO) <<
"Logger-" <<
id_ <<
" moving on to next file. " << *
this;
305 current_file_->
close();
306 delete current_file_;
307 current_file_ =
nullptr;
309 if (!
fs::fsync(current_file_path_,
true)) {
317 LOG(INFO) <<
"Logger-" <<
id_ <<
" next file=" << current_file_path_;
318 current_file_ =
new fs::DirectIoFile(current_file_path_,
322 LOG(INFO) <<
"Logger-" <<
id_ <<
" moved on to next file. " << *
this;
327 ErrorStack Logger::write_one_epoch(Epoch write_epoch) {
330 bool had_any_log =
false;
331 for (thread::Thread* the_thread : assigned_threads_) {
332 ThreadLogBuffer& buffer = the_thread->get_thread_log_buffer();
333 ThreadLogBuffer::OffsetRange range = buffer.get_logs_to_write(write_epoch);
334 ASSERT_ND(range.begin_ <= buffer.get_meta().buffer_size_);
335 ASSERT_ND(range.end_ <= buffer.get_meta().buffer_size_);
336 if (range.begin_ > buffer.get_meta().buffer_size_
337 || range.end_ > buffer.get_meta().buffer_size_) {
338 LOG(FATAL) <<
"Logger-" << id_ <<
" reported an invalid buffer range for epoch-"
339 << write_epoch <<
". begin=" << range.begin_ <<
", end=" << range.end_
340 <<
" while log buffer size=" << buffer.get_meta().buffer_size_
344 if (!range.is_empty()) {
345 if (had_any_log ==
false) {
348 VLOG(1) <<
"Logger-" << id_ <<
" has a non-empty epoch-" << write_epoch;
353 if (range.begin_ < range.end_) {
354 CHECK_ERROR(write_one_epoch_piece(buffer, write_epoch, range.begin_, range.end_));
362 VLOG(0) <<
"Wraps around. from_offset=" << range.begin_ <<
", upto_offset=" << range.end_;
363 uint64_t capacity = buffer.get_meta().buffer_size_;
364 CHECK_ERROR(write_one_epoch_piece(buffer, write_epoch, range.begin_, capacity));
365 CHECK_ERROR(write_one_epoch_piece(buffer, write_epoch, 0, range.end_));
368 buffer.on_log_written(write_epoch);
370 CHECK_ERROR(update_durable_epoch(write_epoch, had_any_log));
374 ErrorStack Logger::write_one_epoch_piece(
375 const ThreadLogBuffer& buffer,
377 uint64_t from_offset,
378 uint64_t upto_offset) {
381 if (from_offset == upto_offset) {
385 VLOG(0) <<
"Writing out Thread-" << buffer.get_thread_id() <<
"'s log. from_offset="
386 << from_offset <<
", upto_offset=" << upto_offset <<
", write_epoch=" << write_epoch;
389 const char* raw_buffer = buffer.get_buffer();
390 assert_written_logs(write_epoch, raw_buffer + from_offset, upto_offset - from_offset);
394 VLOG(1) <<
"padding at beginning needed. ";
395 char* buf =
reinterpret_cast<char*
>(fill_buffer_.
get_block());
400 FillerLogType* begin_filler_log =
reinterpret_cast<FillerLogType*
>(buf);
401 begin_filler_log->populate(begin_fill_size);
402 buf += begin_fill_size;
407 VLOG(1) <<
"whole log in less than one page.";
408 copy_size = upto_offset - from_offset;
410 VLOG(1) <<
"one page or more.";
414 std::memcpy(buf, raw_buffer + from_offset, copy_size);
423 if (end_fill_size > 0) {
424 FillerLogType* end_filler_log =
reinterpret_cast<FillerLogType*
>(buf);
425 end_filler_log->populate(end_fill_size);
428 from_offset += copy_size;
431 if (from_offset >= upto_offset) {
441 if (middle_size > 0) {
443 VLOG(1) <<
"Writing middle regions: " << middle_size <<
" bytes from " << from_offset;
449 from_offset += middle_size;
454 if (from_offset == upto_offset) {
462 VLOG(1) <<
"padding at end needed.";
463 char* buf =
reinterpret_cast<char*
>(fill_buffer_.
get_block());
465 uint64_t copy_size = upto_offset - from_offset;
467 std::memcpy(buf, raw_buffer + from_offset, copy_size);
472 FillerLogType* filler_log =
reinterpret_cast<FillerLogType*
>(buf);
473 filler_log->populate(fill_size);
479 void Logger::assert_written_logs(Epoch write_epoch,
const char* logs, uint64_t bytes)
const {
488 uint32_t previous_ordinal = 0;
489 for (cur = 0; cur < bytes;) {
490 const LogHeader* header=
reinterpret_cast<const LogHeader*
>(logs + cur);
491 cur += header->log_length_;
495 DLOG(INFO) <<
"Found a filler log in assert_written_logs: size=" << header->log_length_;
500 reinterpret_cast<const RecordLogType*
>(header)->assert_valid_generic();
501 ASSERT_ND(header->storage_id_ <= largest_storage_id);
502 Epoch record_epoch = header->xct_id_.get_epoch();
505 uint32_t record_ordinal = header->xct_id_.get_ordinal();
506 ASSERT_ND(previous_ordinal <= record_ordinal);
507 previous_ordinal = record_ordinal;
513 void Logger::assert_consistent() {
529 std::stringstream stream;
535 <<
"<id_>" << v.
id_ <<
"</id_>"
536 <<
"<numa_node_>" <<
static_cast<int>(v.
numa_node_) <<
"</numa_node_>"
537 <<
"<in_node_ordinal_>" << static_cast<int>(v.
in_node_ordinal_) <<
"</in_node_ordinal_>"
538 <<
"<log_folder_>" << v.log_folder_ <<
"</log_folder_>";
539 o <<
"<assigned_thread_ids_>";
540 for (
auto thread_id : v.assigned_thread_ids_) {
541 o <<
"<thread_id>" << thread_id <<
"</thread_id>";
543 o <<
"</assigned_thread_ids_>";
548 <<
"</oldest_file_offset_begin_>"
551 o <<
"<current_file_>";
552 if (v.current_file_) {
553 o << *v.current_file_;
557 o <<
"</current_file_>";
559 o <<
"<current_file_path_>" << v.current_file_path_ <<
"</current_file_path_>";
561 o <<
"<current_file_length_>";
562 if (v.current_file_) {
567 o <<
"</current_file_length_>";
569 o <<
"<epoch_history_head>"
571 o <<
"<epoch_history_count>"
std::atomic< LogFileOrdinal > current_ordinal_
Ordinal of the log file this logger is currently appending to.
std::atomic< uint64_t > oldest_file_offset_begin_
Inclusive beginning of active region in the oldest log file.
ThreadPoolPimpl * get_pimpl() const
Returns the pimpl of this object.
ErrorCode truncate(uint64_t new_length, bool sync=false)
Discard the content of the file after the given offset.
ErrorStack allocate_numa_memory(uint64_t size, AlignedMemory *out) const
ErrorStack initialize_once() override
Epoch marked_epoch_
Upto what epoch the logger has put epoch marker in the log file.
std::string construct_suffixed_log_path(int node, int logger, LogFileOrdinal ordinal) const
construct full path of individual log file (log_folder/LOGGERID_ORDINAL.log)
storage::StorageManager * get_storage_manager() const
See Storage Manager.
uint64_t get_alignment() const
Returns the alignment of the memory block.
uint16_t in_node_ordinal_
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
void release_block()
Releases the memory block.
ErrorCode write(uint64_t desired_bytes, const foedus::memory::AlignedMemory &buffer)
Sequentially write the given amount of contents from the current position.
uint32_t StorageId
Unique ID for storage.
ThreadGroup * get_local_group() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
uint64_t align_log_floor(uint64_t offset)
bool is_initialized() const override
Returns whether the object has been already initialized or not.
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
bool close()
Close the file if not yet closed.
Brings error stacktrace information as return value of functions.
std::atomic< LogFileOrdinal > oldest_ordinal_
Ordinal of the oldest active log file of this logger.
ErrorCode open(bool read, bool write, bool append, bool create)
Tries to open the file for the specified volume.
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
uint64_t get_current_offset() const
Pin the current thread to the given NUMA node in this object's scope.
Declares common log types used in all packages.
ErrorStack refresh_global_durable_epoch()
Called whenever there is a chance that the global durable epoch advances.
bool is_stop_requested() const
const EngineOptions & get_options() const
#define COERCE_ERROR(x)
This macro calls x and aborts if encounters an error.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
LoggerControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
log::LogManager * get_log_manager() const
See Log Manager.
A log writer that writes out buffered logs to stable storages.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
Thread * get_thread(ThreadLocalOrdinal ordinal) const
Returns Thread object for the given ordinal in this group.
Epoch get_durable_epoch() const
Returns this logger's durable epoch.
void * get_block() const
Returns the memory block.
0x020C : "FILESYS: fsync() failed." .
ErrorCode write_raw(uint64_t desired_bytes, const void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
uint32_t epoch_history_head_
index of the oldest history in epoch_histories_
std::ostream & operator<<(std::ostream &o, const LogHeader &v)
uint64_t get_size() const
Returns the byte size of the memory block.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
xct::XctManager * get_xct_manager() const
See Transaction Manager.
StorageId get_largest_storage_id()
Returns the largest StorageId that does or did exist.
ErrorStack uninitialize_once() override
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
NumaNodeMemory * get_local_memory() const
Represents an I/O stream on one file without filesystem caching.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
const ErrorStack kRetOk
Normal return value for no-error case.
std::string to_string() const
0x3001 : foedus::log::FillerLogType .
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
uint64_t align_log_ceil(uint64_t offset)
Epoch get_initial_durable_epoch() const
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
bool is_log_aligned(uint64_t offset)
void add_epoch_history(const EpochMarkerLogType &epoch_marker)
Append a new epoch history.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Information in savepoint for one logger.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
uint32_t log_file_size_mb_
Size in MB of each file loggers write out.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
We always write to file in a multiply of this value, filling up the rest if needed.
uint32_t epoch_history_count_
number of active entries in epoch_histories_ .
LoggerSavepointInfo get_logger_savepoint(log::LoggerId logger_id)
Returns the saved information of the given logger in latest savepoint.
EpochInteger value() const
Returns the raw integer representation.
0x3002 : foedus::log::EpochMarkerLogType .
bool is_null() const
Returns if this object doesn't hold a valid memory block.