libfoedus-core
FOEDUS Core Library
foedus::snapshot::LogReducer Class Referencefinal

A log reducer, which receives log entries sent from mappers and applies them to construct new snapshot files. More...

Detailed Description

A log reducer, which receives log entries sent from mappers and applies them to construct new snapshot files.

Overview

Reducers receive log entries from mappers and apply them to new snapshot files.

Sorting

The log entries are sorted in a few steps to be processed efficiently and simply. Sorting starts when one of the reducer's buffer becomes full. Reducer never starts sorting until that to maximize the benefits of batch-processing (this design might be revisited later, though). Reducers maintain two buffers to let mappers keep sending data while reducers are sorting and dumping to temporary files.

Storage Sorting

The first step is to sort log entries by storage, which is done in mappers. We process all log entries of one storage together. This has a benefit of code simplicity and less D-cache misses. We don't actually sort in this case because we don't care the order between storages. Thus, we use hashmap-like structure in mappers to sort based on storage-id.

Upon receiving a chunk of data from mappers, the reducer has to collect all of them to do the following (otherwise the sorting is incomplete). This is done by simply reading all block headers utilizing the block_length_ property. Assuming each block is sufficiently large, this jumping cost on DRAM should be negligible. If each block is not sufficiently large, there are anyway other performance issues.

Key and Ordinal Sorting

Then, in each storage, we sort logs by keys and then by ordinal (*). The algorithm to do this sorting depends on the storage type (eg Array, Masstree) because some storage has a VERY efficient way to do this. We exploit the fact that this sorting occurs only per storage, just passing the whole log entries for the storage to storage-specific logic defined in foedus::storage::Partitioner. This is another reason to sort by storage first.

(*) We do need to sort by ordinal. Otherwise correct result is not guaranteed. For example, imagine the following case:

  • UPDATE rec-1 to A. Log-ordinal 1.
  • UPDATE rec-1 to B. Log-ordinal 2. Ordinal-1 must be processed before ordinal 2.

For more details, see foedus::storage::Partitioner.

Dumping Logs and and Merging

After the sorting, the reducer dumps the buffer to a file. When all logs are received, the reducer does merge-sort on top of the sorted run files.

Compacting Logs

In some cases, we can delete log entries for the same keys. For example, when we have two logs for the same key like the example above, we can safely omit the first log with ordinal 1 AS FAR AS both logs appear in the same reducer buffer and updated byte positions in the record are the same. Another example is updates followed by a deletion.

This compaction is especially useful for a record that is repeatedly updated/inserted/deleted, such as TPC-C's WAREHOUSE/DISTRICT records, where several thousands of overwrite-logs in each reducer buffer will be compacted into just one log.

See foedus::storage::Partitioner::sort_batch() for more details.

Data Pages

One tricky thing in reducer is how it manages data pages to read previous snapshot pages and apply the new logs. So far, we assume each reducer allocates a sufficient amount of DRAM to hold all pages it read/write during one snapshotting. If this doesn't hold, we might directly allocate pages on NVRAM and read/write there.

Note
This is a private implementation-details of Snapshot Manager, thus file name ends with _impl. Do not include this header from a client program. There is no case client program needs to access this internal class.
Todo:
This class got a bit bloated and hard to do a whitebox test because of dependencies to other modules. Dump-part and merge-part should be separated into its own classes in a way testcases can independently test them. Maybe reducer should be its own package?

Definition at line 293 of file log_reducer_impl.hpp.

#include <log_reducer_impl.hpp>

Inheritance diagram for foedus::snapshot::LogReducer:
Collaboration diagram for foedus::snapshot::LogReducer:

Public Member Functions

 LogReducer (Engine *engine)
 
ErrorStack initialize_once () override
 
ErrorStack uninitialize_once () override
 
std::string to_string () const override
 Expects "LogReducer-x", "LogMapper-y" etc. More...
 
- Public Member Functions inherited from foedus::snapshot::MapReduceBase
 MapReduceBase (Engine *engine, uint16_t id)
 
 MapReduceBase ()=delete
 
 MapReduceBase (const MapReduceBase &other)=delete
 
MapReduceBaseoperator= (const MapReduceBase &other)=delete
 
LogGleanerRefget_parent ()
 
uint16_t get_id () const
 
uint16_t get_numa_node () const
 
void launch_thread ()
 Start executing. More...
 
void join_thread ()
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Protected Member Functions

ErrorStack handle_process () override
 Implements the specific logics in derived class. More...
 
- Protected Member Functions inherited from foedus::snapshot::MapReduceBase
ErrorCode check_cancelled () const
 Derived class's handle_process() should occasionally call this to exit if it's cancelled. More...
 

Friends

std::ostream & operator<< (std::ostream &o, const LogReducer &v)
 

Additional Inherited Members

- Protected Attributes inherited from foedus::snapshot::MapReduceBase
Engine *const engine_
 
LogGleanerRef parent_
 
const uint16_t id_
 Unique ID of this mapper or reducer. More...
 
const uint16_t numa_node_
 
std::atomic< bool > running_
 only for sanity check More...
 
std::thread thread_
 

Constructor & Destructor Documentation

foedus::snapshot::LogReducer::LogReducer ( Engine engine)
explicit

Definition at line 52 of file log_reducer_impl.cpp.

References ASSERT_ND, foedus::snapshot::MapReduceBase::engine_, foedus::Engine::get_options(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), foedus::snapshot::SnapshotOptions::log_reducer_buffer_mb_, foedus::soc::NodeMemoryAnchors::log_reducer_buffers_, foedus::soc::NodeMemoryAnchors::log_reducer_memory_, foedus::soc::NodeMemoryAnchors::log_reducer_root_info_pages_, foedus::snapshot::MapReduceBase::numa_node_, and foedus::EngineOptions::snapshot_.

53 : MapReduceBase(engine, engine->get_soc_id()),
54  previous_snapshot_files_(engine_),
55  sorted_runs_(0) {
56  soc::NodeMemoryAnchors* anchors = engine->get_soc_manager()->get_shared_memory_repo()->
57  get_node_memory_anchors(numa_node_);
58  control_block_ = anchors->log_reducer_memory_;
59  buffers_[0] = anchors->log_reducer_buffers_[0];
60  buffers_[1] = anchors->log_reducer_buffers_[1];
61  const SnapshotOptions& option = engine_->get_options().snapshot_;
62  buffer_half_size_bytes_ = static_cast<uint64_t>(option.log_reducer_buffer_mb_) << 19;
63  ASSERT_ND(reinterpret_cast<char*>(buffers_[0]) + buffer_half_size_bytes_
64  == reinterpret_cast<char*>(buffers_[1]));
65  root_info_pages_ = anchors->log_reducer_root_info_pages_;
66 }
const EngineOptions & get_options() const
Definition: engine.cpp:39
snapshot::SnapshotOptions snapshot_
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Member Function Documentation

ErrorStack foedus::snapshot::LogReducer::handle_process ( )
overrideprotectedvirtual

Implements the specific logics in derived class.

Implements foedus::snapshot::MapReduceBase.

Definition at line 134 of file log_reducer_impl.cpp.

References ASSERT_ND, foedus::snapshot::MapReduceBase::check_cancelled(), CHECK_ERROR, foedus::snapshot::LogReducerControlBlock::current_buffer_, foedus::snapshot::LogReducerControlBlock::get_current_buffer_status(), foedus::snapshot::LogReducerControlBlock::get_non_current_buffer_status(), foedus::snapshot::LogGleanerRef::is_all_mappers_completed(), foedus::snapshot::ReducerBufferStatus::is_clear(), foedus::snapshot::ReducerBufferStatus::is_no_more_writers(), foedus::kRetOk, foedus::snapshot::MapReduceBase::parent_, to_string(), and WRAP_ERROR_CODE.

134  {
135  while (true) {
136  std::this_thread::sleep_for(std::chrono::milliseconds(10));
139  break;
140  }
141  // should I switch the current buffer?
142  // this is while, not if, in case the new current buffer becomes full while this reducer is
143  // dumping the old current buffer.
144  while (control_block_->get_current_buffer_status().is_no_more_writers()) {
146  // okay, let's switch now. As this thread dumps the buffer as soon as this happens,
147  // only one of the buffers can be full.
148  if (!control_block_->get_non_current_buffer_status().is_clear()) {
149  LOG(FATAL) << to_string() << " wtf. both buffers are in use, can't happen";
150  }
151  LOG(INFO) << to_string() << " switching buffer. current_buffer_="
152  << control_block_->current_buffer_;
153  control_block_->current_buffer_.fetch_add(1U);
154  ASSERT_ND(sorted_runs_ + 1U == control_block_->current_buffer_);
155  // Then, immediately start dumping the full buffer.
156  CHECK_ERROR(dump_buffer());
157  }
158  }
159 
160  LOG(INFO) << to_string() << " all mappers are done, this reducer starts the merge-sort phase.";
163  CHECK_ERROR(merge_sort());
164 
165  LOG(INFO) << to_string() << " all done.";
166  return kRetOk;
167 }
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
ReducerBufferStatus get_current_buffer_status() const
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
ReducerBufferStatus get_non_current_buffer_status() const
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.

Here is the call graph for this function:

ErrorStack foedus::snapshot::LogReducer::initialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 68 of file log_reducer_impl.cpp.

References foedus::memory::AlignedMemory::alloc(), ASSERT_ND, CHECK_ERROR, foedus::snapshot::MapReduceBase::engine_, foedus::snapshot::MapReduceBase::get_numa_node(), foedus::Engine::get_options(), foedus::memory::AlignedMemory::get_size(), foedus::Engine::get_soc_id(), foedus::snapshot::LogReducerControlBlock::id_, foedus::snapshot::LogReducerControlBlock::initialize(), foedus::DefaultInitializable::initialize(), foedus::memory::AlignedMemory::is_null(), foedus::memory::kHugepageSize, foedus::memory::AlignedMemory::kNumaAllocOnnode, foedus::kRetOk, foedus::snapshot::SnapshotOptions::log_reducer_dump_io_buffer_mb_, foedus::snapshot::MapReduceBase::numa_node_, foedus::EngineOptions::snapshot_, foedus::snapshot::SnapshotOptions::snapshot_writer_intermediate_pool_size_mb_, and foedus::snapshot::SnapshotOptions::snapshot_writer_page_pool_size_mb_.

68  {
69  control_block_->initialize();
70  control_block_->id_ = engine_->get_soc_id();
71 
72  const SnapshotOptions& option = engine_->get_options().snapshot_;
73 
74  uint64_t dump_buffer_size = static_cast<uint64_t>(option.log_reducer_dump_io_buffer_mb_) << 20;
75  dump_io_buffer_.alloc(
76  dump_buffer_size,
79  get_numa_node());
80  ASSERT_ND(!dump_io_buffer_.is_null());
81 
82  // start from 1/16 of the main buffer. Should be big enough.
83  sort_buffer_.alloc(
84  buffer_half_size_bytes_ >> 5,
87  get_numa_node());
88 
89  // start from 1/16 of the main buffer. Should be big enough.
90  positions_buffers_.alloc(
91  buffer_half_size_bytes_ >> 5,
94  get_numa_node());
95  input_positions_slice_ = memory::AlignedMemorySlice(
96  &positions_buffers_,
97  0,
98  positions_buffers_.get_size() >> 1);
99  output_positions_slice_ = memory::AlignedMemorySlice(
100  &positions_buffers_,
101  positions_buffers_.get_size() >> 1,
102  positions_buffers_.get_size() >> 1);
103 
104  writer_pool_memory_.alloc(
105  static_cast<uint64_t>(option.snapshot_writer_page_pool_size_mb_) << 20,
108  numa_node_);
109 
110  writer_intermediate_memory_.alloc(
111  static_cast<uint64_t>(option.snapshot_writer_intermediate_pool_size_mb_) << 20,
114  numa_node_),
115 
116  sorted_runs_ = 0;
117 
118  CHECK_ERROR(previous_snapshot_files_.initialize());
119  return kRetOk;
120 }
numa_alloc_onnode() and numa_free().
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
const EngineOptions & get_options() const
Definition: engine.cpp:39
const uint64_t kHugepageSize
So far 2MB is the only page size available via Transparent Huge Page (THP).
Definition: memory_id.hpp:50
snapshot::SnapshotOptions snapshot_
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
uint64_t get_size() const
Returns the byte size of the memory block.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint16_t id_
ID of this reducer (or numa node ID).
bool is_null() const
Returns if this object doesn't hold a valid memory block.

Here is the call graph for this function:

std::string foedus::snapshot::LogReducer::to_string ( ) const
inlineoverridevirtual

Expects "LogReducer-x", "LogMapper-y" etc.

Used only for logging/debugging.

Implements foedus::snapshot::MapReduceBase.

Definition at line 300 of file log_reducer_impl.hpp.

References foedus::snapshot::MapReduceBase::id_.

Referenced by handle_process().

300 { return std::string("Reducer-") + std::to_string(id_); }
const uint16_t id_
Unique ID of this mapper or reducer.

Here is the caller graph for this function:

ErrorStack foedus::snapshot::LogReducer::uninitialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 122 of file log_reducer_impl.cpp.

References foedus::ErrorStackBatch::emprace_back(), foedus::memory::AlignedMemory::release_block(), SUMMARIZE_ERROR_BATCH, foedus::snapshot::LogReducerControlBlock::uninitialize(), and foedus::DefaultInitializable::uninitialize().

122  {
123  ErrorStackBatch batch;
124  batch.emprace_back(previous_snapshot_files_.uninitialize());
125  writer_intermediate_memory_.release_block();
126  writer_pool_memory_.release_block();
127  dump_io_buffer_.release_block();
128  sort_buffer_.release_block();
129  positions_buffers_.release_block();
130  control_block_->uninitialize();
131  return SUMMARIZE_ERROR_BATCH(batch);
132 }
void release_block()
Releases the memory block.
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.

Here is the call graph for this function:

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  o,
const LogReducer v 
)
friend

Definition at line 912 of file log_reducer_impl.cpp.

912  {
913  o << "<LogReducer>"
914  << "<id_>" << v.get_id() << "</id_>"
915  << "<numa_node>" << v.get_numa_node() << "</numa_node>"
916  << "<total_storage_count>" << v.control_block_->total_storage_count_ << "</total_storage_count>"
917  << "<sort_buffer_>" << v.sort_buffer_ << "</sort_buffer_>"
918  << "<positions_buffers_>" << v.positions_buffers_ << "</positions_buffers_>"
919  << "<current_buffer_>" << v.control_block_->current_buffer_ << "</current_buffer_>"
920  << "<sorted_runs_>" << v.sorted_runs_ << "</sorted_runs_>"
921  << "</LogReducer>";
922  return o;
923 }

The documentation for this class was generated from the following files: