libfoedus-core
FOEDUS Core Library
|
A log reducer, which receives log entries sent from mappers and applies them to construct new snapshot files. More...
A log reducer, which receives log entries sent from mappers and applies them to construct new snapshot files.
Reducers receive log entries from mappers and apply them to new snapshot files.
The log entries are sorted in a few steps to be processed efficiently and simply. Sorting starts when one of the reducer's buffer becomes full. Reducer never starts sorting until that to maximize the benefits of batch-processing (this design might be revisited later, though). Reducers maintain two buffers to let mappers keep sending data while reducers are sorting and dumping to temporary files.
The first step is to sort log entries by storage, which is done in mappers. We process all log entries of one storage together. This has a benefit of code simplicity and less D-cache misses. We don't actually sort in this case because we don't care the order between storages. Thus, we use hashmap-like structure in mappers to sort based on storage-id.
Upon receiving a chunk of data from mappers, the reducer has to collect all of them to do the following (otherwise the sorting is incomplete). This is done by simply reading all block headers utilizing the block_length_ property. Assuming each block is sufficiently large, this jumping cost on DRAM should be negligible. If each block is not sufficiently large, there are anyway other performance issues.
Then, in each storage, we sort logs by keys and then by ordinal (*). The algorithm to do this sorting depends on the storage type (eg Array, Masstree) because some storage has a VERY efficient way to do this. We exploit the fact that this sorting occurs only per storage, just passing the whole log entries for the storage to storage-specific logic defined in foedus::storage::Partitioner. This is another reason to sort by storage first.
(*) We do need to sort by ordinal. Otherwise correct result is not guaranteed. For example, imagine the following case:
For more details, see foedus::storage::Partitioner.
After the sorting, the reducer dumps the buffer to a file. When all logs are received, the reducer does merge-sort on top of the sorted run files.
In some cases, we can delete log entries for the same keys. For example, when we have two logs for the same key like the example above, we can safely omit the first log with ordinal 1 AS FAR AS both logs appear in the same reducer buffer and updated byte positions in the record are the same. Another example is updates followed by a deletion.
This compaction is especially useful for a record that is repeatedly updated/inserted/deleted, such as TPC-C's WAREHOUSE/DISTRICT records, where several thousands of overwrite-logs in each reducer buffer will be compacted into just one log.
See foedus::storage::Partitioner::sort_batch() for more details.
One tricky thing in reducer is how it manages data pages to read previous snapshot pages and apply the new logs. So far, we assume each reducer allocates a sufficient amount of DRAM to hold all pages it read/write during one snapshotting. If this doesn't hold, we might directly allocate pages on NVRAM and read/write there.
Definition at line 293 of file log_reducer_impl.hpp.
#include <log_reducer_impl.hpp>
Public Member Functions | |
LogReducer (Engine *engine) | |
ErrorStack | initialize_once () override |
ErrorStack | uninitialize_once () override |
std::string | to_string () const override |
Expects "LogReducer-x", "LogMapper-y" etc. More... | |
![]() | |
MapReduceBase (Engine *engine, uint16_t id) | |
MapReduceBase ()=delete | |
MapReduceBase (const MapReduceBase &other)=delete | |
MapReduceBase & | operator= (const MapReduceBase &other)=delete |
LogGleanerRef * | get_parent () |
uint16_t | get_id () const |
uint16_t | get_numa_node () const |
void | launch_thread () |
Start executing. More... | |
void | join_thread () |
![]() | |
DefaultInitializable () | |
virtual | ~DefaultInitializable () |
DefaultInitializable (const DefaultInitializable &)=delete | |
DefaultInitializable & | operator= (const DefaultInitializable &)=delete |
ErrorStack | initialize () override final |
Typical implementation of Initializable::initialize() that provides initialize-once semantics. More... | |
ErrorStack | uninitialize () override final |
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More... | |
bool | is_initialized () const override final |
Returns whether the object has been already initialized or not. More... | |
![]() | |
virtual | ~Initializable () |
Protected Member Functions | |
ErrorStack | handle_process () override |
Implements the specific logics in derived class. More... | |
![]() | |
ErrorCode | check_cancelled () const |
Derived class's handle_process() should occasionally call this to exit if it's cancelled. More... | |
Friends | |
std::ostream & | operator<< (std::ostream &o, const LogReducer &v) |
Additional Inherited Members | |
![]() | |
Engine *const | engine_ |
LogGleanerRef | parent_ |
const uint16_t | id_ |
Unique ID of this mapper or reducer. More... | |
const uint16_t | numa_node_ |
std::atomic< bool > | running_ |
only for sanity check More... | |
std::thread | thread_ |
|
explicit |
Definition at line 52 of file log_reducer_impl.cpp.
References ASSERT_ND, foedus::snapshot::MapReduceBase::engine_, foedus::Engine::get_options(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), foedus::snapshot::SnapshotOptions::log_reducer_buffer_mb_, foedus::soc::NodeMemoryAnchors::log_reducer_buffers_, foedus::soc::NodeMemoryAnchors::log_reducer_memory_, foedus::soc::NodeMemoryAnchors::log_reducer_root_info_pages_, foedus::snapshot::MapReduceBase::numa_node_, and foedus::EngineOptions::snapshot_.
|
overrideprotectedvirtual |
Implements the specific logics in derived class.
Implements foedus::snapshot::MapReduceBase.
Definition at line 134 of file log_reducer_impl.cpp.
References ASSERT_ND, foedus::snapshot::MapReduceBase::check_cancelled(), CHECK_ERROR, foedus::snapshot::LogReducerControlBlock::current_buffer_, foedus::snapshot::LogReducerControlBlock::get_current_buffer_status(), foedus::snapshot::LogReducerControlBlock::get_non_current_buffer_status(), foedus::snapshot::LogGleanerRef::is_all_mappers_completed(), foedus::snapshot::ReducerBufferStatus::is_clear(), foedus::snapshot::ReducerBufferStatus::is_no_more_writers(), foedus::kRetOk, foedus::snapshot::MapReduceBase::parent_, to_string(), and WRAP_ERROR_CODE.
|
overridevirtual |
Implements foedus::DefaultInitializable.
Definition at line 68 of file log_reducer_impl.cpp.
References foedus::memory::AlignedMemory::alloc(), ASSERT_ND, CHECK_ERROR, foedus::snapshot::MapReduceBase::engine_, foedus::snapshot::MapReduceBase::get_numa_node(), foedus::Engine::get_options(), foedus::memory::AlignedMemory::get_size(), foedus::Engine::get_soc_id(), foedus::snapshot::LogReducerControlBlock::id_, foedus::snapshot::LogReducerControlBlock::initialize(), foedus::DefaultInitializable::initialize(), foedus::memory::AlignedMemory::is_null(), foedus::memory::kHugepageSize, foedus::memory::AlignedMemory::kNumaAllocOnnode, foedus::kRetOk, foedus::snapshot::SnapshotOptions::log_reducer_dump_io_buffer_mb_, foedus::snapshot::MapReduceBase::numa_node_, foedus::EngineOptions::snapshot_, foedus::snapshot::SnapshotOptions::snapshot_writer_intermediate_pool_size_mb_, and foedus::snapshot::SnapshotOptions::snapshot_writer_page_pool_size_mb_.
|
inlineoverridevirtual |
Expects "LogReducer-x", "LogMapper-y" etc.
Used only for logging/debugging.
Implements foedus::snapshot::MapReduceBase.
Definition at line 300 of file log_reducer_impl.hpp.
References foedus::snapshot::MapReduceBase::id_.
Referenced by handle_process().
|
overridevirtual |
Implements foedus::DefaultInitializable.
Definition at line 122 of file log_reducer_impl.cpp.
References foedus::ErrorStackBatch::emprace_back(), foedus::memory::AlignedMemory::release_block(), SUMMARIZE_ERROR_BATCH, foedus::snapshot::LogReducerControlBlock::uninitialize(), and foedus::DefaultInitializable::uninitialize().
|
friend |
Definition at line 912 of file log_reducer_impl.cpp.