libfoedus-core
FOEDUS Core Library
foedus::snapshot::LogMapper Class Referencefinal

A log mapper, which reads log files from one logger and sends them to corresponding log reducers. More...

Detailed Description

A log mapper, which reads log files from one logger and sends them to corresponding log reducers.

Overview

Mappers read logs per epoch. As log files are guaranteed to be strictly ordered by epoch (see Logger code), we can simply read log files sequentially to achieve this.

Mappers send logs to partitions as follows:

  • Engine-wide and Storage-wide logs (eg DROP STORAGE) are centrally processed at the end of epoch. So, mappers just buffer them and send all of them back to LogGleaner, which will process all of them.
  • Record-wise logs always have storage-id. Mappers bucketize logs by storage ID to do the following in a batched fashion.
  • For each storage batch, mappers check the partitioning information, creating one if not exists (see LogGleaner).
  • Mappers send logs to corresponding reducers with a compact metadata for each storage.

Possible Optimization

The log gleaner so far simply reads from log files. We have a plan to optimize its behavior when we have a large amount of DRAM by directly reading from the log buffer if it is not blown away yet. ThreadLogBuffer has an additional marker "head" for this purpose, but so far we don't use it to simplify the implementation.

Note
This is a private implementation-details of Snapshot Manager, thus file name ends with _impl. Do not include this header from a client program. There is no case client program needs to access this internal class.

Definition at line 73 of file log_mapper_impl.hpp.

#include <log_mapper_impl.hpp>

Inheritance diagram for foedus::snapshot::LogMapper:
Collaboration diagram for foedus::snapshot::LogMapper:

Public Member Functions

 LogMapper (Engine *engine, uint16_t local_ordinal)
 
ErrorStack initialize_once () override
 
ErrorStack uninitialize_once () override
 
std::string to_string () const override
 Expects "LogReducer-x", "LogMapper-y" etc. More...
 
- Public Member Functions inherited from foedus::snapshot::MapReduceBase
 MapReduceBase (Engine *engine, uint16_t id)
 
 MapReduceBase ()=delete
 
 MapReduceBase (const MapReduceBase &other)=delete
 
MapReduceBaseoperator= (const MapReduceBase &other)=delete
 
LogGleanerRefget_parent ()
 
uint16_t get_id () const
 
uint16_t get_numa_node () const
 
void launch_thread ()
 Start executing. More...
 
void join_thread ()
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Protected Member Functions

ErrorStack handle_process () override
 Implements the specific logics in derived class. More...
 
- Protected Member Functions inherited from foedus::snapshot::MapReduceBase
ErrorCode check_cancelled () const
 Derived class's handle_process() should occasionally call this to exit if it's cancelled. More...
 

Friends

std::ostream & operator<< (std::ostream &o, const LogMapper &v)
 

Additional Inherited Members

- Protected Attributes inherited from foedus::snapshot::MapReduceBase
Engine *const engine_
 
LogGleanerRef parent_
 
const uint16_t id_
 Unique ID of this mapper or reducer. More...
 
const uint16_t numa_node_
 
std::atomic< bool > running_
 only for sanity check More...
 
std::thread thread_
 

Constructor & Destructor Documentation

foedus::snapshot::LogMapper::LogMapper ( Engine engine,
uint16_t  local_ordinal 
)

Definition at line 58 of file log_mapper_impl.cpp.

59  : MapReduceBase(engine, calculate_logger_id(engine, local_ordinal)),
60  processed_log_count_(0) {
61  clear_storage_buckets();
62 }
uint16_t calculate_logger_id(Engine *engine, uint16_t local_ordinal)
Unique ID of this log mapper.

Member Function Documentation

ErrorStack foedus::snapshot::LogMapper::handle_process ( )
overrideprotectedvirtual

Implements the specific logics in derived class.

Implements foedus::snapshot::MapReduceBase.

Definition at line 140 of file log_mapper_impl.cpp.

References foedus::snapshot::align_io_ceil(), foedus::snapshot::align_io_floor(), ASSERT_ND, foedus::log::LogRange::begin_file_ordinal, foedus::log::LogRange::begin_offset, foedus::snapshot::MapReduceBase::check_cancelled(), CHECK_ERROR, foedus::log::LogOptions::construct_suffixed_log_path(), foedus::debugging::StopWatch::elapsed_sec(), foedus::snapshot::SnapshotOptions::emulation_, foedus::log::LogRange::end_file_ordinal, foedus::log::LogRange::end_offset, foedus::snapshot::MapReduceBase::engine_, foedus::fs::file_size(), foedus::snapshot::LogGleanerRef::get_base_epoch(), foedus::Engine::get_log_manager(), foedus::log::LoggerRef::get_log_range(), foedus::log::LogManager::get_logger(), foedus::Engine::get_options(), foedus::memory::AlignedMemory::get_size(), foedus::snapshot::LogGleanerRef::get_valid_until_epoch(), foedus::snapshot::MapReduceBase::id_, foedus::log::LogRange::is_empty(), foedus::fs::DirectIoFile::kDirectIoSeekSet, foedus::kRetOk, foedus::EngineOptions::log_, foedus::snapshot::MapReduceBase::numa_node_, foedus::snapshot::MapReduceBase::parent_, foedus::EngineOptions::snapshot_, foedus::fs::status(), foedus::debugging::StopWatch::stop(), to_string(), and WRAP_ERROR_CODE.

140  {
141  const Epoch base_epoch = parent_.get_base_epoch();
142  const Epoch until_epoch = parent_.get_valid_until_epoch();
143  log::LoggerRef logger = engine_->get_log_manager()->get_logger(id_);
144  const log::LogRange log_range = logger.get_log_range(base_epoch, until_epoch);
145  // uint64_t cur_offset = log_range.begin_offset;
146  if (log_range.is_empty()) {
147  LOG(INFO) << to_string() << " has no logs to process";
148  report_completion(0);
149  return kRetOk;
150  }
151 
152  // open the file and seek to there. be careful on page boundary.
153  // as we use direct I/O, all I/O must be 4kb-aligned. when the read range is not
154  // a multiply of 4kb, we read a little bit more (at most 4kb per read, so negligible).
155  // to clarify, here we use the following suffixes
156  // "infile"/"inbuf" : the offset is an offset in entire file/IO buffer
157  // "aligned" : the offset is 4kb-aligned (careful on floor vs ceil)
158  // Lengthy, but otherwise it's so confusing.
159  processed_log_count_ = 0;
160  IoBufStatus status;
161  status.size_inbuf_aligned_ = io_buffer_.get_size();
162  status.cur_file_ordinal_ = log_range.begin_file_ordinal;
163  status.ended_ = false;
164  status.first_read_ = true;
165  debugging::StopWatch watch;
166  while (!status.ended_) { // loop for log file switch
168  numa_node_,
169  id_,
170  status.cur_file_ordinal_));
171  uint64_t file_size = fs::file_size(path);
172  if (file_size % kIoAlignment != 0) {
173  LOG(WARNING) << to_string() << " Interesting, non-aligned file size, which probably means"
174  << " previous writes didn't flush. file path=" << path << ", file size=" << file_size;
175  file_size = align_io_floor(file_size);
176  }
177  ASSERT_ND(file_size % kIoAlignment == 0);
178  status.size_infile_aligned_ = file_size;
179 
180  // If this is the first file to read, we might be reading from non-zero position.
181  // In that case, be careful on alignment.
182  if (status.cur_file_ordinal_ == log_range.begin_file_ordinal) {
183  status.next_infile_ = log_range.begin_offset;
184  } else {
185  status.next_infile_ = 0;
186  }
187 
188  if (status.cur_file_ordinal_ == log_range.end_file_ordinal) {
189  ASSERT_ND(log_range.end_offset <= file_size);
190  status.end_infile_ = log_range.end_offset;
191  } else {
192  status.end_infile_ = file_size;
193  }
194 
195  DVLOG(1) << to_string() << " file path=" << path << ", file size=" << assorted::Hex(file_size)
196  << ", read_end=" << assorted::Hex(status.end_infile_);
197  fs::DirectIoFile file(path, engine_->get_options().snapshot_.emulation_);
198  WRAP_ERROR_CODE(file.open(true, false, false, false));
199  DVLOG(1) << to_string() << "opened log file " << file;
200 
201  while (true) {
202  WRAP_ERROR_CODE(check_cancelled()); // check per each read
203  status.buf_infile_aligned_ = align_io_floor(status.next_infile_);
204  WRAP_ERROR_CODE(file.seek(status.buf_infile_aligned_, fs::DirectIoFile::kDirectIoSeekSet));
205  DVLOG(1) << to_string() << " seeked to: " << assorted::Hex(status.buf_infile_aligned_);
206  status.end_inbuf_aligned_ = std::min(
207  io_buffer_.get_size(),
208  align_io_ceil(status.end_infile_ - status.buf_infile_aligned_));
209  ASSERT_ND(status.end_inbuf_aligned_ % kIoAlignment == 0);
210  WRAP_ERROR_CODE(file.read(status.end_inbuf_aligned_, &io_buffer_));
211 
212  status.cur_inbuf_ = 0;
213  if (status.next_infile_ != status.buf_infile_aligned_) {
214  ASSERT_ND(status.next_infile_ > status.buf_infile_aligned_);
215  status.cur_inbuf_ = status.next_infile_ - status.buf_infile_aligned_;
216  status.cur_inbuf_ = status.next_infile_ - status.buf_infile_aligned_;
217  DVLOG(1) << to_string() << " skipped " << status.cur_inbuf_ << " bytes for aligned read";
218  }
219 
220  CHECK_ERROR(handle_process_buffer(file, &status));
221  if (status.more_in_the_file_) {
222  ASSERT_ND(status.next_infile_ > status.buf_infile_aligned_);
223  } else {
224  if (log_range.end_file_ordinal == status.cur_file_ordinal_) {
225  status.ended_ = true;
226  break;
227  } else {
228  ++status.cur_file_ordinal_;
229  status.next_infile_ = 0;
230  LOG(INFO) << to_string()
231  << " moved on to next log file ordinal " << status.cur_file_ordinal_;
232  }
233  }
234  }
235  file.close();
236  }
237  watch.stop();
238  LOG(INFO) << to_string() << " processed " << processed_log_count_ << " log entries in "
239  << watch.elapsed_sec() << "s";
240  report_completion(watch.elapsed_sec());
241  return kRetOk;
242 }
LoggerRef get_logger(LoggerId logger_id)
Returns a reference to the logger of the given ID.
Definition: log_manager.cpp:49
std::string construct_suffixed_log_path(int node, int logger, LogFileOrdinal ordinal) const
construct full path of individual log file (log_folder/LOGGERID_ORDINAL.log)
Definition: log_options.cpp:41
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
const uint64_t kIoAlignment
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower data device.
FileStatus status(const Path &p)
Returns the status of the file.
Definition: filesystem.cpp:45
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint64_t align_io_floor(uint64_t offset)
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
LogRange get_log_range(Epoch prev_epoch, Epoch until_epoch)
Constructs the range of log entries that represent the given epoch ranges.
Definition: logger_ref.cpp:91
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
snapshot::SnapshotOptions snapshot_
uint64_t align_io_ceil(uint64_t offset)
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
const uint16_t id_
Unique ID of this mapper or reducer.
uint64_t get_size() const
Returns the byte size of the memory block.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
The offset is set to offset bytes.

Here is the call graph for this function:

ErrorStack foedus::snapshot::LogMapper::initialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 64 of file log_mapper_impl.cpp.

References foedus::memory::AlignedMemory::alloc(), ASSERT_ND, foedus::snapshot::MapReduceBase::engine_, foedus::Engine::get_options(), foedus::memory::AlignedMemory::is_null(), foedus::memory::kHugepageSize, foedus::memory::AlignedMemory::kNumaAllocOnnode, foedus::kRetOk, foedus::snapshot::SnapshotOptions::log_mapper_bucket_kb_, foedus::snapshot::SnapshotOptions::log_mapper_io_buffer_mb_, foedus::snapshot::MapReduceBase::numa_node_, and foedus::EngineOptions::snapshot_.

64  {
65  const SnapshotOptions& option = engine_->get_options().snapshot_;
66 
67  uint64_t io_buffer_size = static_cast<uint64_t>(option.log_mapper_io_buffer_mb_) << 20;
68  io_buffer_size = assorted::align<uint64_t, memory::kHugepageSize>(io_buffer_size);
69  io_buffer_.alloc(
70  io_buffer_size,
73  numa_node_);
74  ASSERT_ND(!io_buffer_.is_null());
75 
76  uint64_t bucket_size = static_cast<uint64_t>(option.log_mapper_bucket_kb_) << 10;
77  buckets_memory_.alloc(
78  bucket_size,
81  numa_node_);
82  ASSERT_ND(!buckets_memory_.is_null());
83 
84  const uint64_t tmp_memory_size = memory::kHugepageSize;
85  tmp_memory_.alloc(
86  tmp_memory_size,
89  numa_node_);
90  ASSERT_ND(!tmp_memory_.is_null());
91 
92  // these automatically expand
93  presort_buffer_.alloc(
94  1U << 21,
97  numa_node_);
98  presort_ouputs_.alloc(
99  1U << 21,
102  numa_node_);
103 
104  uint64_t tmp_offset = 0;
105  tmp_send_buffer_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kSendBufferSize);
106  tmp_offset += kSendBufferSize;
107  tmp_position_array_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kBucketSize);
108  tmp_offset += kBucketSize;
109  tmp_sort_array_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kBucketSize << 1);
110  tmp_offset += kBucketSize << 1;
111  const uint64_t hashlist_bytesize = kBucketHashListMaxCount * sizeof(BucketHashList);
112  tmp_hashlist_buffer_slice_ = memory::AlignedMemorySlice(
113  &tmp_memory_, tmp_offset, hashlist_bytesize);
114  tmp_offset += hashlist_bytesize;
115  tmp_partition_array_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kBucketSize);
116  tmp_offset += kBucketSize;
117  if (tmp_memory_size < tmp_offset) {
118  LOG(FATAL) << "tmp_memory_size is too small. some contant values are messed up";
119  }
120 
121  processed_log_count_ = 0;
122  clear_storage_buckets();
123 
124  return kRetOk;
125 }
numa_alloc_onnode() and numa_free().
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
const EngineOptions & get_options() const
Definition: engine.cpp:39
const uint64_t kHugepageSize
So far 2MB is the only page size available via Transparent Huge Page (THP).
Definition: memory_id.hpp:50
snapshot::SnapshotOptions snapshot_
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool is_null() const
Returns if this object doesn't hold a valid memory block.

Here is the call graph for this function:

std::string foedus::snapshot::LogMapper::to_string ( ) const
inlineoverridevirtual

Expects "LogReducer-x", "LogMapper-y" etc.

Used only for logging/debugging.

Implements foedus::snapshot::MapReduceBase.

Definition at line 80 of file log_mapper_impl.hpp.

References foedus::snapshot::MapReduceBase::id_.

Referenced by handle_process().

80  {
81  return std::string("LogMapper-") + std::to_string(id_);
82  }
const uint16_t id_
Unique ID of this mapper or reducer.

Here is the caller graph for this function:

ErrorStack foedus::snapshot::LogMapper::uninitialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 127 of file log_mapper_impl.cpp.

References foedus::memory::AlignedMemory::release_block(), and SUMMARIZE_ERROR_BATCH.

127  {
128  ErrorStackBatch batch;
129  io_buffer_.release_block();
130  buckets_memory_.release_block();
131  tmp_memory_.release_block();
132  clear_storage_buckets();
133  return SUMMARIZE_ERROR_BATCH(batch);
134 }
void release_block()
Releases the memory block.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.

Here is the call graph for this function:

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  o,
const LogMapper v 
)
friend

Definition at line 700 of file log_mapper_impl.cpp.

700  {
701  o << "<LogMapper>"
702  << "<id_>" << v.id_ << "</id_>"
703  << "<numa_node_>" << static_cast<int>(v.numa_node_) << "</numa_node_>"
704  << "<buckets_allocated_count_>" << v.buckets_allocated_count_ << "</buckets_allocated_count_>"
705  << "<hashlist_allocated_count>" << v.hashlist_allocated_count_ << "</hashlist_allocated_count>"
706  << "<processed_log_count_>" << v.processed_log_count_ << "</processed_log_count_>"
707  << "</LogMapper>";
708  return o;
709 }

The documentation for this class was generated from the following files: