20 #include <glog/logging.h>
54 previous_snapshot_files_(engine_),
63 ASSERT_ND(reinterpret_cast<char*>(buffers_[0]) + buffer_half_size_bytes_
64 == reinterpret_cast<char*>(buffers_[1]));
75 dump_io_buffer_.
alloc(
84 buffer_half_size_bytes_ >> 5,
90 positions_buffers_.
alloc(
91 buffer_half_size_bytes_ >> 5,
102 positions_buffers_.
get_size() >> 1);
104 writer_pool_memory_.
alloc(
110 writer_intermediate_memory_.
alloc(
136 std::this_thread::sleep_for(std::chrono::milliseconds(10));
149 LOG(FATAL) <<
to_string() <<
" wtf. both buffers are in use, can't happen";
151 LOG(INFO) <<
to_string() <<
" switching buffer. current_buffer_="
160 LOG(INFO) <<
to_string() <<
" all mappers are done, this reducer starts the merge-sort phase.";
165 LOG(INFO) <<
to_string() <<
" all done.";
170 LOG(INFO) <<
"Sorting and dumping " <<
to_string() <<
"'s buffer to a file."
171 <<
" current sorted_runs_=" << sorted_runs_;
172 uint32_t buffer_index = sorted_runs_ % 2;
174 LOG(FATAL) <<
"wtf. this buffer is still open for writers";
177 dump_buffer_wait_for_writers(buffer_index);
181 ASSERT_ND(final_status.get_tail_bytes() <= buffer_half_size_bytes_);
183 LOG(INFO) <<
to_string() <<
" Started sort/dump " <<
184 final_status.get_tail_bytes() <<
" bytes of logs";
186 char*
const base =
reinterpret_cast<char*
>(buffers_[buffer_index]);
187 std::map<storage::StorageId, std::vector<BufferPosition> > blocks;
188 dump_buffer_scan_block_headers(base, final_status.components.tail_position_, &blocks);
191 fs::Path path = get_sorted_run_file_path(sorted_runs_);
194 LOG(INFO) <<
to_string() <<
" Created a sorted run file " << path;
197 for (
auto& kv : blocks) {
199 LogBuffer log_buffer(base);
201 uint32_t written_count;
202 uint32_t shortest_key_length;
203 uint32_t longest_key_length;
208 &shortest_key_length,
229 LOG(INFO) <<
to_string() <<
" Done sort/dump " <<
231 << stop_watch.elapsed_ms() <<
"ms"
242 ErrorStack LogReducer::dump_buffer_wait_for_writers(uint32_t buffer_index)
const {
243 debugging::StopWatch wait_watch;
248 LOG(INFO) <<
to_string() <<
" Okay, now active_writers==0. waited/looped for "
249 << wait_watch.elapsed_us() <<
"us";
254 void LogReducer::dump_buffer_scan_block_headers(
258 debugging::StopWatch header_watch;
261 uint32_t total_blocks = 0;
263 FullBlockHeader* header =
reinterpret_cast<FullBlockHeader*
>(buffer_base + cur);
264 if (!header->is_full_block()) {
265 LOG(FATAL) <<
to_string() <<
" wtf. magic word doesn't match. cur=" << cur << *header;
267 header->assert_key_length();
268 auto it = blocks->find(header->storage_id_);
269 if (it != blocks->end()) {
272 std::vector<BufferPosition> vec;
273 vec.reserve(1 << 10);
276 header->storage_id_, vec));
283 LOG(INFO) <<
to_string() <<
" Scanned all blocks. There were " << total_blocks <<
" blocks"
284 <<
", " << blocks->size() <<
" distinct storages."
285 <<
" scan elapsed time=" << header_watch.elapsed_us() <<
"us";
288 ErrorStack LogReducer::dump_buffer_sort_storage(
289 const LogBuffer &buffer,
291 const std::vector<BufferPosition>& log_positions,
292 uint32_t* out_shortest_key_length,
293 uint32_t* out_longest_key_length,
294 uint32_t* written_count) {
297 uint64_t records = 0;
299 FullBlockHeader* header =
reinterpret_cast<FullBlockHeader*
>(buffer.resolve(position));
300 if (!header->is_full_block()) {
301 LOG(FATAL) <<
to_string() <<
" wtf. magic word doesn't match. position=" << position
302 <<
", storage_id=" << storage_id << *header;
304 header->assert_key_length();
305 records += header->log_count_;
309 uint64_t positions_buffer_size = records *
sizeof(
BufferPosition);
310 expand_positions_buffers_if_needed(positions_buffer_size);
312 uint64_t cur_rec_total = 0;
315 uint32_t shortest_key_length = 0xFFFF;
316 uint32_t longest_key_length = 0;
318 FullBlockHeader* header =
reinterpret_cast<FullBlockHeader*
>(buffer.resolve(position));
319 if (!header->is_full_block()) {
320 LOG(FATAL) <<
to_string() <<
" wtf. magic word doesn't match. position=" << position
321 <<
", storage_id=" << storage_id << *header;
323 header->assert_key_length();
324 shortest_key_length = std::min<uint32_t>(shortest_key_length, header->shortest_key_length_);
325 longest_key_length = std::max<uint32_t>(longest_key_length, header->longest_key_length_);
327 for (uint32_t i = 0; i < header->log_count_; ++i) {
328 log::RecordLogType* record = buffer.resolve(record_pos);
329 ASSERT_ND(record->header_.storage_id_ == storage_id);
330 ASSERT_ND(record->header_.log_length_ > 0);
331 inputs[cur_rec_total] = record_pos;
335 ASSERT_ND(record_pos == position + header->block_length_);
340 storage::Partitioner partitioner(
engine_, storage_id);
343 storage::Partitioner::SortBatchArguments args = {
346 static_cast<uint32_t
>(records),
353 partitioner.sort_batch(args);
355 *out_shortest_key_length = shortest_key_length;
356 *out_longest_key_length = longest_key_length;
360 uint64_t LogReducer::dump_block_header(
361 const LogBuffer &buffer,
364 uint32_t shortest_key_length,
365 uint32_t longest_key_length,
367 void* destination)
const {
371 debugging::StopWatch length_watch;
372 uint64_t total_bytes =
sizeof(FullBlockHeader);
373 for (uint32_t i = 0; i < log_count; ++i) {
374 total_bytes += buffer.resolve(sorted_logs[i])->header_.log_length_;
377 LOG(INFO) <<
to_string() <<
" iterated over " << log_count
378 <<
" log records to figure out block length in "<< length_watch.elapsed_us() <<
"us";
380 FullBlockHeader* header =
reinterpret_cast<FullBlockHeader*
>(destination);
381 header->storage_id_ = storage_id;
382 header->log_count_ = log_count;
385 header->shortest_key_length_ = shortest_key_length;
386 header->longest_key_length_ = longest_key_length;
387 header->assert_key_length();
391 ErrorStack LogReducer::dump_buffer_sort_storage_write(
392 const LogBuffer &buffer,
395 uint32_t shortest_key_length,
396 uint32_t longest_key_length,
398 fs::DirectIoFile *dump_file) {
399 debugging::StopWatch write_watch;
400 char* io_buffer =
reinterpret_cast<char*
>(dump_io_buffer_.
get_block());
404 const uint64_t flush_threshold = dump_io_buffer_.
get_size() - (1 << 16);
405 uint64_t total_bytes = dump_block_header(
413 uint64_t total_written = 0;
414 uint64_t current_pos =
sizeof(FullBlockHeader);
415 for (uint32_t i = 0; i < log_count; ++i) {
416 const log::RecordLogType* record = buffer.resolve(sorted_logs[i]);
418 ASSERT_ND(record->header_.storage_id_ == storage_id);
419 ASSERT_ND(record->header_.log_length_ > 0);
420 ASSERT_ND(record->header_.log_length_ % 8 == 0);
421 std::memcpy(io_buffer + current_pos, record, record->header_.log_length_);
422 current_pos += record->header_.log_length_;
423 if (current_pos >= flush_threshold) {
427 if (current_pos > flush_threshold) {
428 std::memcpy(io_buffer, io_buffer + flush_threshold, current_pos - flush_threshold);
430 current_pos -= flush_threshold;
431 total_written += flush_threshold;
435 ASSERT_ND(total_bytes == total_written + current_pos);
437 if (current_pos > 0) {
438 ASSERT_ND(current_pos < flush_threshold);
441 uint64_t upto = assorted::align<uint64_t, log::FillerLogType::kLogWriteUnitSize>(current_pos);
445 FillerBlockHeader* filler =
reinterpret_cast<FillerBlockHeader*
>(io_buffer + current_pos);
449 if (upto - current_pos >
sizeof(FillerBlockHeader)) {
452 io_buffer + current_pos +
sizeof(FillerBlockHeader),
454 upto - current_pos -
sizeof(FillerBlockHeader));
461 total_written += current_pos;
466 LOG(INFO) <<
to_string() <<
" Wrote out storage-" << storage_id <<
" which had " << log_count
467 <<
" log records (" << total_written <<
" bytes) in "<< write_watch.elapsed_ms() <<
"ms";
472 void LogReducer::expand_if_needed(
473 uint64_t required_size,
474 memory::AlignedMemory *memory,
475 const std::string& name) {
476 if (memory->is_null() || memory->get_size() < required_size) {
477 if (memory->is_null()) {
478 LOG(INFO) <<
to_string() <<
" initially allocating " << name <<
"."
479 << assorted::Hex(required_size) <<
" bytes.";
481 LOG(WARNING) <<
to_string() <<
" automatically expanding " << name <<
" from "
482 << assorted::Hex(memory->get_size()) <<
" bytes to "
483 << assorted::Hex(required_size) <<
" bytes. if this happens often,"
484 <<
" our sizing is wrong.";
493 void LogReducer::expand_positions_buffers_if_needed(uint64_t required_size_per_buffer) {
495 if (input_positions_slice_.
get_size() < required_size_per_buffer) {
496 uint64_t new_size = required_size_per_buffer * 2;
497 LOG(WARNING) <<
to_string() <<
" automatically expanding positions_buffers from "
498 << positions_buffers_.
get_size() <<
" to " << new_size <<
". if this happens often,"
499 <<
" our sizing is wrong.";
500 positions_buffers_.
alloc(
505 input_positions_slice_ = memory::AlignedMemorySlice(
508 positions_buffers_.
get_size() >> 1);
509 output_positions_slice_ = memory::AlignedMemorySlice(
512 positions_buffers_.
get_size() >> 1);
516 fs::Path LogReducer::get_sorted_run_file_path(uint32_t sorted_run)
const {
518 std::stringstream file_name;
519 file_name <<
"/sorted_run_"
522 << static_cast<int>(sorted_run) <<
".tmp";
524 path /= file_name.str();
528 LogReducer::MergeContext::MergeContext(uint32_t dumped_files_count)
529 : dumped_files_count_(dumped_files_count),
530 tmp_sorted_buffer_array_(new SortedBuffer*[dumped_files_count + 1]),
531 tmp_sorted_buffer_count_(0) {
534 LogReducer::MergeContext::~MergeContext() {
535 sorted_buffers_.clear();
537 for (
auto& file : sorted_files_auto_ptrs_) {
540 sorted_files_auto_ptrs_.clear();
542 io_memory_.release_block();
543 delete[] tmp_sorted_buffer_array_;
544 tmp_sorted_buffer_array_ =
nullptr;
550 for (uint32_t i = 0 ; i < sorted_buffers_.size(); ++i) {
552 if (the_storage_id == 0) {
556 storage_id = the_storage_id;
559 storage_id = std::min(storage_id, the_storage_id);
565 void LogReducer::MergeContext::set_tmp_sorted_buffer_array(
storage::StorageId storage_id) {
566 tmp_sorted_buffer_count_ = 0;
567 for (uint32_t i = 0 ; i < sorted_buffers_.size(); ++i) {
568 if (sorted_buffers_[i]->get_cur_block_storage_id() == storage_id) {
569 tmp_sorted_buffer_array_[tmp_sorted_buffer_count_] = sorted_buffers_[i].get();
570 ++tmp_sorted_buffer_count_;
576 ErrorStack LogReducer::merge_sort() {
577 merge_sort_check_buffer_status();
582 SnapshotWriter snapshot_writer(
586 &writer_pool_memory_,
587 &writer_intermediate_memory_);
594 MergeContext context(sorted_runs_);
595 LOG(INFO) <<
to_string() <<
" merge sorting " << sorted_runs_ <<
" sorted runs and the current"
596 <<
" buffer which has "
599 debugging::StopWatch merge_watch;
602 merge_sort_allocate_io_buffers(&context);
603 CHECK_ERROR(merge_sort_open_sorted_runs(&context));
604 CHECK_ERROR(merge_sort_initialize_sort_buffers(&context));
607 memory::AlignedMemory composer_work_memory;
608 composer_work_memory.alloc(
620 if (storage_id <= prev_storage_id) {
621 LOG(FATAL) <<
to_string() <<
" wtf. not storage sorted? " << *
this;
623 prev_storage_id = storage_id;
626 VLOG(0) <<
to_string() <<
" merging storage-" << storage_id <<
", num="
628 context.set_tmp_sorted_buffer_array(storage_id);
631 storage::Composer composer(
engine_, storage_id);
635 storage::Composer::ComposeArguments args = {
637 &previous_snapshot_files_,
638 context.tmp_sorted_buffer_array_,
639 context.tmp_sorted_buffer_count_,
640 &composer_work_memory,
646 for (
auto& ptr : context.sorted_buffers_) {
647 SortedBuffer *buffer = ptr.get();
653 snapshot_writer.close();
655 LOG(INFO) <<
to_string() <<
" completed merging in " << merge_watch.elapsed_sec() <<
" seconds"
661 void LogReducer::merge_sort_check_buffer_status()
const {
664 <= buffer_half_size_bytes_);
666 <= buffer_half_size_bytes_);
669 LOG(FATAL) <<
to_string() <<
" non-current buffer still has some data. this must not happen"
670 <<
" at merge_sort step.";
673 if (cur_status.components.active_writers_ > 0) {
674 LOG(FATAL) <<
to_string() <<
" last buffer is still being written. this must not happen"
675 <<
" at merge_sort step.";
677 ASSERT_ND(!cur_status.is_no_more_writers());
680 void LogReducer::merge_sort_allocate_io_buffers(LogReducer::MergeContext* context)
const {
681 if (context->dumped_files_count_ == 0) {
682 LOG(INFO) <<
to_string() <<
" great, no sorted run files. everything in-memory";
685 debugging::StopWatch alloc_watch;
686 uint64_t size_per_run =
688 uint64_t size_total = size_per_run * context->dumped_files_count_;
689 context->io_memory_.alloc(
694 for (uint32_t i = 0; i < context->dumped_files_count_; ++i) {
695 context->io_buffers_.emplace_back(memory::AlignedMemorySlice(
696 &context->io_memory_,
701 LOG(INFO) <<
to_string() <<
" allocated IO buffers (" << size_total <<
" bytes in total) "
702 <<
" in " << alloc_watch.elapsed_us() <<
"us";
704 ErrorStack LogReducer::merge_sort_dump_last_buffer() {
707 LOG(INFO) <<
to_string() <<
" sorting the last buffer in memory (" << last_pos * 8 <<
"B)...";
708 debugging::StopWatch watch;
711 char*
const base =
reinterpret_cast<char*
>(buffers_[last]);
712 char*
const other =
reinterpret_cast<char*
>(buffers_[(last + 1) % 2]);
713 std::map<storage::StorageId, std::vector<BufferPosition> > blocks;
714 dump_buffer_scan_block_headers(base, last_pos, &blocks);
715 uint64_t other_bytes = 0;
716 uint64_t total_log_count = 0;
717 for (
auto& kv : blocks) {
718 LogBuffer buffer(base);
720 uint32_t shortest_key_length;
721 uint32_t longest_key_length;
727 &shortest_key_length,
730 total_log_count += count;
734 uint64_t total_bytes = dump_block_header(
741 other + other_bytes);
742 uint64_t current_pos =
sizeof(FullBlockHeader) + other_bytes;
743 for (uint32_t i = 0; i < count; ++i) {
744 const log::RecordLogType* record = buffer.resolve(pos[i]);
746 ASSERT_ND(record->header_.storage_id_ == storage_id);
747 ASSERT_ND(record->header_.log_length_ > 0);
748 ASSERT_ND(record->header_.log_length_ % 8 == 0);
749 std::memcpy(other + current_pos, record, record->header_.log_length_);
750 current_pos += record->header_.log_length_;
752 ASSERT_ND(total_bytes + other_bytes == current_pos);
753 other_bytes += total_bytes;
758 ReducerBufferStatus other_status;
759 other_status.components.active_writers_ = 0;
764 LOG(INFO) <<
to_string() <<
" sorted the last buffer in memory (" << last_pos * 8 <<
"B -> "
765 << other_bytes <<
"B) in " << watch.elapsed_ms() <<
"ms, total_log_count=" << total_log_count;
769 ErrorStack LogReducer::merge_sort_open_sorted_runs(LogReducer::MergeContext* context)
const {
772 void* last_buffer = buffers_[last_buffer_index % 2];
774 context->sorted_buffers_.emplace_back(
new InMemorySortedBuffer(
775 reinterpret_cast<char*>(last_buffer),
779 ASSERT_ND(context->io_buffers_.size() == sorted_runs_);
780 for (uint32_t sorted_run = 0 ; sorted_run < context->dumped_files_count_; ++sorted_run) {
781 fs::Path path = get_sorted_run_file_path(sorted_run);
783 LOG(FATAL) <<
to_string() <<
" wtf. this sorted run file doesn't exist " << path;
786 if (file_size == 0) {
787 LOG(FATAL) <<
to_string() <<
" wtf. this sorted run file is empty " << path;
790 std::unique_ptr<fs::DirectIoFile> file_ptr(
new fs::DirectIoFile(
795 context->sorted_buffers_.emplace_back(
new DumpFileSortedBuffer(
797 context->io_buffers_[sorted_run]));
798 context->sorted_files_auto_ptrs_.emplace_back(std::move(file_ptr));
801 ASSERT_ND(context->sorted_files_auto_ptrs_.size() == context->sorted_buffers_.size() - 1U);
802 ASSERT_ND(context->dumped_files_count_ == context->sorted_buffers_.size() - 1U);
806 ErrorStack LogReducer::merge_sort_initialize_sort_buffers(LogReducer::MergeContext* context)
const {
807 for (uint32_t index = 0 ; index < context->sorted_buffers_.size(); ++index) {
808 SortedBuffer* buffer = context->sorted_buffers_[index].get();
809 if (buffer->get_total_size() == 0) {
810 buffer->invalidate_current_block();
811 LOG(INFO) <<
to_string() <<
" buffer-" << index <<
" is empty";
815 DumpFileSortedBuffer* casted =
dynamic_cast<DumpFileSortedBuffer*
>(buffer);
818 uint64_t desired_reads = std::min(casted->get_buffer_size(), casted->get_total_size());
819 WRAP_ERROR_CODE(casted->get_file()->read(desired_reads, casted->get_io_buffer()));
821 ASSERT_ND(dynamic_cast<InMemorySortedBuffer*>(buffer));
827 const FullBlockHeader* header =
reinterpret_cast<const FullBlockHeader*
>(
828 buffer->get_buffer());
829 if (!header->is_full_block()) {
830 LOG(FATAL) <<
to_string() <<
" wtf. first block in the file is not a real storage block."
831 << *buffer << *header;
833 buffer->set_current_block(
836 sizeof(FullBlockHeader),
838 header->shortest_key_length_,
839 header->longest_key_length_);
846 ErrorCode LogReducer::merge_sort_advance_sort_buffers(
847 SortedBuffer* buffer,
849 if (buffer->get_cur_block_storage_id() != processed_storage_id) {
852 uint64_t next_block_header_pos = buffer->get_cur_block_abosulte_end();
853 uint64_t in_buffer_pos = buffer->to_relative_pos(next_block_header_pos);
854 const BlockHeaderBase* next_header
855 =
reinterpret_cast<const BlockHeaderBase*
>(buffer->get_buffer() + in_buffer_pos);
858 if (next_block_header_pos < buffer->get_total_size() && next_header->is_filler()) {
862 next_block_header_pos += skip_bytes;
863 VLOG(1) <<
to_string() <<
" skipped a filler block. " << skip_bytes <<
" bytes";
864 if (next_block_header_pos +
sizeof(FullBlockHeader)
865 > buffer->get_offset() + buffer->get_buffer_size()) {
868 LOG(INFO) <<
to_string() <<
" wow, we unluckily hit buffer boundary while skipping"
869 <<
" a filler block. it's rare!";
871 ASSERT_ND(next_block_header_pos >= buffer->get_offset());
872 ASSERT_ND(next_block_header_pos +
sizeof(FullBlockHeader)
873 <= buffer->get_offset() + buffer->get_buffer_size());
876 in_buffer_pos = buffer->to_relative_pos(next_block_header_pos);
878 reinterpret_cast<const BlockHeaderBase*
>(buffer->get_buffer() + in_buffer_pos);
882 if (next_block_header_pos >= buffer->get_total_size()) {
883 ASSERT_ND(next_block_header_pos == buffer->get_total_size());
885 buffer->invalidate_current_block();
886 LOG(INFO) <<
to_string() <<
" fully merged a stream: " << *buffer;
888 if (!next_header->is_full_block()) {
889 LOG(FATAL) <<
to_string() <<
" wtf. block magic word doesn't match. pos="
890 << next_block_header_pos <<
", " << *next_header;
893 const FullBlockHeader* next_header_casted
894 =
reinterpret_cast<const FullBlockHeader*
>(next_header);
895 if (next_header_casted->storage_id_ == 0 ||
896 next_header_casted->log_count_ == 0 ||
897 next_header_casted->block_length_ == 0) {
898 LOG(FATAL) <<
to_string() <<
" wtf. invalid block header. pos="
899 << next_block_header_pos << *next_header_casted;
901 buffer->set_current_block(
902 next_header_casted->storage_id_,
903 next_header_casted->log_count_,
904 next_block_header_pos +
sizeof(FullBlockHeader),
906 next_header_casted->shortest_key_length_,
907 next_header_casted->longest_key_length_);
914 <<
"<id_>" << v.
get_id() <<
"</id_>"
916 <<
"<total_storage_count>" << v.control_block_->
total_storage_count_ <<
"</total_storage_count>"
917 <<
"<sort_buffer_>" << v.sort_buffer_ <<
"</sort_buffer_>"
918 <<
"<positions_buffers_>" << v.positions_buffers_ <<
"</positions_buffers_>"
919 <<
"<current_buffer_>" << v.control_block_->
current_buffer_ <<
"</current_buffer_>"
920 <<
"<sorted_runs_>" << v.sorted_runs_ <<
"</sorted_runs_>"
928 <<
"<block_length_>" << v.
block_length_ <<
"</block_length_>"
933 uint32_t LogReducer::get_max_storage_count()
const {
uint64_t get_tail_bytes() const
bool is_no_more_writers() const
uint16_t get_numa_node() const
ErrorStack initialize_once() override
numa_alloc_onnode() and numa_free().
BufferPosition to_buffer_position(uint64_t byte_position)
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
uint32_t snapshot_writer_page_pool_size_mb_
The size in MB of one snapshot writer, which holds data pages modified in the snapshot and them seque...
void release_block()
Releases the memory block.
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
ReducerBufferStatus get_current_buffer_status() const
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower data device.
uint32_t BufferPosition
Represents a position in some buffer.
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
Brings error stacktrace information as return value of functions.
uint16_t get_active_writers() const
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
struct foedus::snapshot::ReducerBufferStatus::Components components
Same as GlobalMemoryAnchors except this is for node_memories_.
A bit-wise flag in ReducerBufferStatus's flags_.
Definitions of IDs in this package and a few related constant values.
Declares common log types used in all packages.
ErrorStack handle_process() override
Implements the specific logics in derived class.
const EngineOptions & get_options() const
uint32_t max_storages_
Maximum number of storages in this database.
ReducerBufferStatus get_non_current_buffer_status() const
std::atomic< uint64_t > buffer_status_[2]
Status of the two reducer buffers.
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
ErrorStack uninitialize_once() override
const uint64_t kHugepageSize
So far 2MB is the only page size available via Transparent Huge Page (THP).
uint64_t from_buffer_position(BufferPosition buffer_position)
Batches zero or more ErrorStack objects to represent in one ErrorStack.
uint32_t log_reducer_read_io_buffer_kb_
The size in KB of a buffer in reducer to read one temporary file.
Analogue of boost::filesystem::path.
BufferPosition get_tail_position() const
LogReducer(Engine *engine)
storage::StorageOptions storage_
bool is_all_mappers_completed() const
snapshot::SnapshotOptions snapshot_
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
SnapshotId get_snapshot_id() const
Database engine object that holds all resources and provides APIs.
bool exists(const Path &p)
Returns if the file exists.
snapshot::LogReducerControlBlock * log_reducer_memory_
Tiny control memory for LogReducer in this node.
uint64_t get_size() const
A slice of foedus::memory::AlignedMemory.
void * get_block() const
Returns the memory block.
void * log_reducer_buffers_[2]
Actual buffers for LogReducer.
Set of options for snapshot manager.
uint64_t file_size(const Path &p)
Returns size of the file.
Epoch get_base_epoch() const
uint64_t get_size() const
Returns the byte size of the memory block.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
uint32_t log_reducer_buffer_mb_
The size in MB of a buffer to store log entries in reducer (partition).
storage::Page * log_reducer_root_info_pages_
This is the 'output' of the reducer in this node.
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
BufferPosition tail_position_
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Represents an I/O stream on one file without filesystem caching.
const ErrorStack kRetOk
Normal return value for no-error case.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Convenient way of writing hex integers to stream.
const uint16_t numa_node_
Base class for LogMapper and LogReducer to share common code.
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint32_t log_reducer_dump_io_buffer_mb_
The size in MB of a buffer to write out sorted log entries in reducer to a temporary file...
A high-resolution stop watch.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
uint32_t snapshot_writer_intermediate_pool_size_mb_
The size in MB of additional page pool for one snapshot writer just for holding intermediate pages...
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
ErrorCode
Enum of error codes defined in error_code.xmacro.
std::string convert_folder_path_pattern(int node) const
converts folder_path_pattern_ into a string with the given node.
uint16_t id_
ID of this reducer (or numa node ID).
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
We always write to file in a multiply of this value, filling up the rest if needed.
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
bool is_null() const
Returns if this object doesn't hold a valid memory block.