18 #ifndef FOEDUS_SNAPSHOT_LOG_REDUCER_IMPL_HPP_
19 #define FOEDUS_SNAPSHOT_LOG_REDUCER_IMPL_HPP_
137 ASSERT_ND(shortest_key_length_ <= longest_key_length_);
300 std::string
to_string()
const override {
return std::string(
"Reducer-") + std::to_string(
id_); }
310 struct MergeContext {
311 explicit MergeContext(uint32_t dumped_files_count_);
319 const uint32_t dumped_files_count_;
321 std::vector< memory::AlignedMemorySlice > io_buffers_;
331 std::vector< std::unique_ptr<SortedBuffer> > sorted_buffers_;
337 std::vector< std::unique_ptr<fs::DirectIoFile> > sorted_files_auto_ptrs_;
340 uint32_t tmp_sorted_buffer_count_;
350 LogReducerControlBlock* control_block_;
362 uint64_t buffer_half_size_bytes_;
415 uint32_t sorted_runs_;
417 void expand_if_needed(
418 uint64_t required_size,
420 const std::string& name);
422 void expand_positions_buffers_if_needed(uint64_t required_size_per_buffer);
424 fs::Path get_sorted_run_file_path(uint32_t sorted_run)
const;
444 ErrorStack dump_buffer_wait_for_writers(uint32_t buffer_index)
const;
452 void dump_buffer_scan_block_headers(
463 const LogBuffer &buffer,
465 const std::vector<BufferPosition>& log_positions,
466 uint32_t* out_shortest_key_length,
467 uint32_t* out_longest_key_length,
468 uint32_t* written_count);
475 const LogBuffer &buffer,
478 uint32_t shortest_key_length,
479 uint32_t longest_key_length,
486 uint64_t dump_block_header(
487 const LogBuffer &buffer,
490 uint32_t shortest_key_length,
491 uint32_t longest_key_length,
493 void* destination)
const;
507 void merge_sort_check_buffer_status()
const;
518 void merge_sort_allocate_io_buffers(MergeContext* context)
const;
522 ErrorStack merge_sort_open_sorted_runs(MergeContext* context)
const;
526 ErrorStack merge_sort_initialize_sort_buffers(MergeContext* context)
const;
533 ErrorCode merge_sort_advance_sort_buffers(
534 SortedBuffer* buffer,
537 uint32_t get_max_storage_count()
const;
543 "LogReducerControlBlock is too large.");
547 #endif // FOEDUS_SNAPSHOT_LOG_REDUCER_IMPL_HPP_
uint64_t get_tail_bytes() const
std::atomic< uint64_t > * get_buffer_status_address(uint32_t index)
bool is_no_more_writers() const
Forward declarations of classes in filesystem package.
ErrorStack initialize_once() override
Definitions of IDs in this package and a few related constant values.
Forward declarations of classes in log manager package.
Typedefs of ID types used in snapshot package.
Compactly represents important status informations of a reducer buffer.
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
ReducerBufferStatus get_current_buffer_status() const
uint32_t BufferPosition
Represents a position in some buffer.
Forward declarations of classes in root package.
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
Brings error stacktrace information as return value of functions.
uint16_t get_active_writers() const
Forward declarations of classes in snapshot manager package.
LogReducerControlBlock()=delete
struct foedus::snapshot::ReducerBufferStatus::Components components
Represents one input stream of sorted log entries.
A bit-wise flag in ReducerBufferStatus's flags_.
Holds a set of read-only file objects for snapshot files.
Typedefs of ID types used in log package.
ErrorStack handle_process() override
Implements the specific logics in derived class.
ReducerBufferStatus get_non_current_buffer_status() const
std::atomic< uint64_t > buffer_status_[2]
Status of the two reducer buffers.
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
ErrorStack uninitialize_once() override
uint64_t from_buffer_position(BufferPosition buffer_position)
Analogue of boost::filesystem::path.
BufferPosition get_tail_position() const
Forward declarations of classes in storage package.
LogReducer(Engine *engine)
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
Constants and methods related to CPU cacheline and its prefetching.
Database engine object that holds all resources and provides APIs.
Shared data for LogReducer.
Just a marker to denote that the memory region represents a data page.
A slice of foedus::memory::AlignedMemory.
const uint16_t id_
Unique ID of this mapper or reducer.
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
BufferPosition tail_position_
Represents an I/O stream on one file without filesystem caching.
Represents one memory block aligned to actual OS/hardware pages.
Base class for LogMapper and LogReducer to share common code.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Forward declarations of classes in thread package.
Raw atomic operations that work for both C++11 and non-C++11 code.
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
ErrorCode
Enum of error codes defined in error_code.xmacro.
friend std::ostream & operator<<(std::ostream &o, const LogReducer &v)
uint16_t id_
ID of this reducer (or numa node ID).
~LogReducerControlBlock()=delete