libfoedus-core
FOEDUS Core Library
log_reducer_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 #include <map>
25 #include <ostream>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "foedus/assert_nd.hpp"
31 #include "foedus/engine.hpp"
33 #include "foedus/epoch.hpp"
39 #include "foedus/fs/filesystem.hpp"
40 #include "foedus/fs/path.hpp"
48 
49 namespace foedus {
50 namespace snapshot {
51 
53 : MapReduceBase(engine, engine->get_soc_id()),
54  previous_snapshot_files_(engine_),
55  sorted_runs_(0) {
57  get_node_memory_anchors(numa_node_);
58  control_block_ = anchors->log_reducer_memory_;
59  buffers_[0] = anchors->log_reducer_buffers_[0];
60  buffers_[1] = anchors->log_reducer_buffers_[1];
61  const SnapshotOptions& option = engine_->get_options().snapshot_;
62  buffer_half_size_bytes_ = static_cast<uint64_t>(option.log_reducer_buffer_mb_) << 19;
63  ASSERT_ND(reinterpret_cast<char*>(buffers_[0]) + buffer_half_size_bytes_
64  == reinterpret_cast<char*>(buffers_[1]));
65  root_info_pages_ = anchors->log_reducer_root_info_pages_;
66 }
67 
69  control_block_->initialize();
70  control_block_->id_ = engine_->get_soc_id();
71 
72  const SnapshotOptions& option = engine_->get_options().snapshot_;
73 
74  uint64_t dump_buffer_size = static_cast<uint64_t>(option.log_reducer_dump_io_buffer_mb_) << 20;
75  dump_io_buffer_.alloc(
76  dump_buffer_size,
79  get_numa_node());
80  ASSERT_ND(!dump_io_buffer_.is_null());
81 
82  // start from 1/16 of the main buffer. Should be big enough.
83  sort_buffer_.alloc(
84  buffer_half_size_bytes_ >> 5,
87  get_numa_node());
88 
89  // start from 1/16 of the main buffer. Should be big enough.
90  positions_buffers_.alloc(
91  buffer_half_size_bytes_ >> 5,
94  get_numa_node());
95  input_positions_slice_ = memory::AlignedMemorySlice(
96  &positions_buffers_,
97  0,
98  positions_buffers_.get_size() >> 1);
99  output_positions_slice_ = memory::AlignedMemorySlice(
100  &positions_buffers_,
101  positions_buffers_.get_size() >> 1,
102  positions_buffers_.get_size() >> 1);
103 
104  writer_pool_memory_.alloc(
105  static_cast<uint64_t>(option.snapshot_writer_page_pool_size_mb_) << 20,
108  numa_node_);
109 
110  writer_intermediate_memory_.alloc(
111  static_cast<uint64_t>(option.snapshot_writer_intermediate_pool_size_mb_) << 20,
114  numa_node_),
115 
116  sorted_runs_ = 0;
117 
118  CHECK_ERROR(previous_snapshot_files_.initialize());
119  return kRetOk;
120 }
121 
123  ErrorStackBatch batch;
124  batch.emprace_back(previous_snapshot_files_.uninitialize());
125  writer_intermediate_memory_.release_block();
126  writer_pool_memory_.release_block();
127  dump_io_buffer_.release_block();
128  sort_buffer_.release_block();
129  positions_buffers_.release_block();
130  control_block_->uninitialize();
131  return SUMMARIZE_ERROR_BATCH(batch);
132 }
133 
135  while (true) {
136  std::this_thread::sleep_for(std::chrono::milliseconds(10));
139  break;
140  }
141  // should I switch the current buffer?
142  // this is while, not if, in case the new current buffer becomes full while this reducer is
143  // dumping the old current buffer.
144  while (control_block_->get_current_buffer_status().is_no_more_writers()) {
146  // okay, let's switch now. As this thread dumps the buffer as soon as this happens,
147  // only one of the buffers can be full.
148  if (!control_block_->get_non_current_buffer_status().is_clear()) {
149  LOG(FATAL) << to_string() << " wtf. both buffers are in use, can't happen";
150  }
151  LOG(INFO) << to_string() << " switching buffer. current_buffer_="
152  << control_block_->current_buffer_;
153  control_block_->current_buffer_.fetch_add(1U);
154  ASSERT_ND(sorted_runs_ + 1U == control_block_->current_buffer_);
155  // Then, immediately start dumping the full buffer.
156  CHECK_ERROR(dump_buffer());
157  }
158  }
159 
160  LOG(INFO) << to_string() << " all mappers are done, this reducer starts the merge-sort phase.";
163  CHECK_ERROR(merge_sort());
164 
165  LOG(INFO) << to_string() << " all done.";
166  return kRetOk;
167 }
168 
169 ErrorStack LogReducer::dump_buffer() {
170  LOG(INFO) << "Sorting and dumping " << to_string() << "'s buffer to a file."
171  << " current sorted_runs_=" << sorted_runs_;
172  uint32_t buffer_index = sorted_runs_ % 2;
173  if (!control_block_->get_buffer_status_atomic(buffer_index).is_no_more_writers()) {
174  LOG(FATAL) << "wtf. this buffer is still open for writers";
175  }
176 
177  dump_buffer_wait_for_writers(buffer_index);
179 
180  ReducerBufferStatus final_status = control_block_->get_buffer_status_atomic(buffer_index);
181  ASSERT_ND(final_status.get_tail_bytes() <= buffer_half_size_bytes_);
182  debugging::StopWatch stop_watch;
183  LOG(INFO) << to_string() << " Started sort/dump " <<
184  final_status.get_tail_bytes() << " bytes of logs";
185 
186  char* const base = reinterpret_cast<char*>(buffers_[buffer_index]);
187  std::map<storage::StorageId, std::vector<BufferPosition> > blocks;
188  dump_buffer_scan_block_headers(base, final_status.components.tail_position_, &blocks);
189 
190  // open a file
191  fs::Path path = get_sorted_run_file_path(sorted_runs_);
192  fs::DirectIoFile file(path);
193  WRAP_ERROR_CODE(file.open(false, true, true, true));
194  LOG(INFO) << to_string() << " Created a sorted run file " << path;
195 
196  // for each storage (ordered by storage ID), sort and dump them into the file.
197  for (auto& kv : blocks) {
199  LogBuffer log_buffer(base);
200  storage::StorageId storage_id = kv.first;
201  uint32_t written_count;
202  uint32_t shortest_key_length;
203  uint32_t longest_key_length;
204  CHECK_ERROR(dump_buffer_sort_storage(
205  log_buffer,
206  storage_id,
207  kv.second,
208  &shortest_key_length,
209  &longest_key_length,
210  &written_count));
211  // write them out to the file
212  BufferPosition* outputs
213  = reinterpret_cast<BufferPosition*>(output_positions_slice_.get_block());
214  CHECK_ERROR(dump_buffer_sort_storage_write(
215  log_buffer,
216  storage_id,
217  outputs,
218  shortest_key_length,
219  longest_key_length,
220  written_count,
221  &file));
222  }
223 
224  // we don't need fsync here. if there is a failure during snapshotting,
225  // we just start over. logs are already durable.
226  file.close();
227 
228  stop_watch.stop();
229  LOG(INFO) << to_string() << " Done sort/dump " <<
230  from_buffer_position(final_status.components.tail_position_) << " bytes in "
231  << stop_watch.elapsed_ms() << "ms"
232  << " dumped file length=" << fs::file_size(path);
233 
234  // clear the status so that mappers can start using this buffer.
235  // note that this reducer has to do the buffer switch before mapper can really start using it.
236  // this reducer immediately checks if it should do so right after this function.
237  ++sorted_runs_;
238  control_block_->buffer_status_[buffer_index] = 0;
239  return kRetOk;
240 }
241 
242 ErrorStack LogReducer::dump_buffer_wait_for_writers(uint32_t buffer_index) const {
243  debugging::StopWatch wait_watch;
244  SPINLOCK_WHILE(control_block_->get_buffer_status_atomic(buffer_index).get_active_writers() > 0) {
246  }
247  wait_watch.stop();
248  LOG(INFO) << to_string() << " Okay, now active_writers==0. waited/looped for "
249  << wait_watch.elapsed_us() << "us";
250  // I'd be very surprised if we were waiting for more than 1000us.
251  return kRetOk;
252 }
253 
254 void LogReducer::dump_buffer_scan_block_headers(
255  char* buffer_base,
256  BufferPosition tail_position,
257  std::map<storage::StorageId, std::vector<BufferPosition> > *blocks) const {
258  debugging::StopWatch header_watch;
259  const uint64_t end = from_buffer_position(tail_position);
260  uint64_t cur = 0;
261  uint32_t total_blocks = 0;
262  while (cur < end) {
263  FullBlockHeader* header = reinterpret_cast<FullBlockHeader*>(buffer_base + cur);
264  if (!header->is_full_block()) {
265  LOG(FATAL) << to_string() << " wtf. magic word doesn't match. cur=" << cur << *header;
266  }
267  header->assert_key_length();
268  auto it = blocks->find(header->storage_id_);
269  if (it != blocks->end()) {
270  it->second.push_back(to_buffer_position(cur));
271  } else {
272  std::vector<BufferPosition> vec;
273  vec.reserve(1 << 10);
274  vec.push_back(to_buffer_position(cur));
275  blocks->insert(std::pair< storage::StorageId, std::vector<BufferPosition> >(
276  header->storage_id_, vec));
277  }
278  cur += from_buffer_position(header->block_length_);
279  ++total_blocks;
280  }
281  ASSERT_ND(cur == end);
282  header_watch.stop();
283  LOG(INFO) << to_string() << " Scanned all blocks. There were " << total_blocks << " blocks"
284  << ", " << blocks->size() << " distinct storages."
285  << " scan elapsed time=" << header_watch.elapsed_us() << "us";
286 }
287 
288 ErrorStack LogReducer::dump_buffer_sort_storage(
289  const LogBuffer &buffer,
290  storage::StorageId storage_id,
291  const std::vector<BufferPosition>& log_positions,
292  uint32_t* out_shortest_key_length,
293  uint32_t* out_longest_key_length,
294  uint32_t* written_count) {
295  // first, count how many log entries are there. this is quick as we have a statistics
296  // in the header.
297  uint64_t records = 0;
298  for (BufferPosition position : log_positions) {
299  FullBlockHeader* header = reinterpret_cast<FullBlockHeader*>(buffer.resolve(position));
300  if (!header->is_full_block()) {
301  LOG(FATAL) << to_string() << " wtf. magic word doesn't match. position=" << position
302  << ", storage_id=" << storage_id << *header;
303  }
304  header->assert_key_length();
305  records += header->log_count_;
306  }
307 
308  // now we need a memory for this long array. expand the memory if not sufficient.
309  uint64_t positions_buffer_size = records * sizeof(BufferPosition);
310  expand_positions_buffers_if_needed(positions_buffer_size);
311  BufferPosition* inputs = reinterpret_cast<BufferPosition*>(input_positions_slice_.get_block());
312  uint64_t cur_rec_total = 0;
313 
314  // put all log positions to the array
315  uint32_t shortest_key_length = 0xFFFF;
316  uint32_t longest_key_length = 0;
317  for (BufferPosition position : log_positions) {
318  FullBlockHeader* header = reinterpret_cast<FullBlockHeader*>(buffer.resolve(position));
319  if (!header->is_full_block()) {
320  LOG(FATAL) << to_string() << " wtf. magic word doesn't match. position=" << position
321  << ", storage_id=" << storage_id << *header;
322  }
323  header->assert_key_length();
324  shortest_key_length = std::min<uint32_t>(shortest_key_length, header->shortest_key_length_);
325  longest_key_length = std::max<uint32_t>(longest_key_length, header->longest_key_length_);
326  BufferPosition record_pos = position + to_buffer_position(sizeof(FullBlockHeader));
327  for (uint32_t i = 0; i < header->log_count_; ++i) {
328  log::RecordLogType* record = buffer.resolve(record_pos);
329  ASSERT_ND(record->header_.storage_id_ == storage_id);
330  ASSERT_ND(record->header_.log_length_ > 0);
331  inputs[cur_rec_total] = record_pos;
332  ++cur_rec_total;
333  record_pos += to_buffer_position(record->header_.log_length_);
334  }
335  ASSERT_ND(record_pos == position + header->block_length_);
336  }
337  ASSERT_ND(cur_rec_total == records);
338 
339  // Now, sort these log records by key and then ordinal. we use the partitioner object for this.
340  storage::Partitioner partitioner(engine_, storage_id);
341  BufferPosition* pos = reinterpret_cast<BufferPosition*>(output_positions_slice_.get_block());
342  *written_count = 0;
343  storage::Partitioner::SortBatchArguments args = {
344  buffer,
345  inputs,
346  static_cast<uint32_t>(records),
347  shortest_key_length,
348  longest_key_length,
349  &sort_buffer_,
351  pos,
352  written_count};
353  partitioner.sort_batch(args);
354 
355  *out_shortest_key_length = shortest_key_length;
356  *out_longest_key_length = longest_key_length;
357  return kRetOk;
358 }
359 
360 uint64_t LogReducer::dump_block_header(
361  const LogBuffer &buffer,
362  storage::StorageId storage_id,
363  const BufferPosition* sorted_logs,
364  uint32_t shortest_key_length,
365  uint32_t longest_key_length,
366  uint32_t log_count,
367  void* destination) const {
368  // figuring out the block length is a bit expensive. we have to go through all log entries.
369  // but, snapshotting happens only once per minutes, and all of these are in-memory operations.
370  // I hope this isn't a big cost. (let's keep an eye on it, though)
371  debugging::StopWatch length_watch;
372  uint64_t total_bytes = sizeof(FullBlockHeader);
373  for (uint32_t i = 0; i < log_count; ++i) {
374  total_bytes += buffer.resolve(sorted_logs[i])->header_.log_length_;
375  }
376  length_watch.stop();
377  LOG(INFO) << to_string() << " iterated over " << log_count
378  << " log records to figure out block length in "<< length_watch.elapsed_us() << "us";
379 
380  FullBlockHeader* header = reinterpret_cast<FullBlockHeader*>(destination);
381  header->storage_id_ = storage_id;
382  header->log_count_ = log_count;
383  header->magic_word_ = BlockHeaderBase::kFullBlockHeaderMagicWord;
384  header->block_length_ = to_buffer_position(total_bytes);
385  header->shortest_key_length_ = shortest_key_length;
386  header->longest_key_length_ = longest_key_length;
387  header->assert_key_length();
388  return total_bytes;
389 }
390 
391 ErrorStack LogReducer::dump_buffer_sort_storage_write(
392  const LogBuffer &buffer,
393  storage::StorageId storage_id,
394  const BufferPosition* sorted_logs,
395  uint32_t shortest_key_length,
396  uint32_t longest_key_length,
397  uint32_t log_count,
398  fs::DirectIoFile *dump_file) {
399  debugging::StopWatch write_watch;
400  char* io_buffer = reinterpret_cast<char*>(dump_io_buffer_.get_block());
401  // we flush the IO buffer when we wrote out this number of bytes.
402  // to keep it aligned, the bytes after this threshold have to be retained and copied over to
403  // the beginning of the buffer.
404  const uint64_t flush_threshold = dump_io_buffer_.get_size() - (1 << 16);
405  uint64_t total_bytes = dump_block_header(
406  buffer,
407  storage_id,
408  sorted_logs,
409  shortest_key_length,
410  longest_key_length,
411  log_count,
412  io_buffer);
413  uint64_t total_written = 0;
414  uint64_t current_pos = sizeof(FullBlockHeader);
415  for (uint32_t i = 0; i < log_count; ++i) {
416  const log::RecordLogType* record = buffer.resolve(sorted_logs[i]);
417  ASSERT_ND(current_pos % 8 == 0);
418  ASSERT_ND(record->header_.storage_id_ == storage_id);
419  ASSERT_ND(record->header_.log_length_ > 0);
420  ASSERT_ND(record->header_.log_length_ % 8 == 0);
421  std::memcpy(io_buffer + current_pos, record, record->header_.log_length_);
422  current_pos += record->header_.log_length_;
423  if (current_pos >= flush_threshold) {
424  WRAP_ERROR_CODE(dump_file->write(flush_threshold, dump_io_buffer_));
425 
426  // move the fragment to beginning
427  if (current_pos > flush_threshold) {
428  std::memcpy(io_buffer, io_buffer + flush_threshold, current_pos - flush_threshold);
429  }
430  current_pos -= flush_threshold;
431  total_written += flush_threshold;
432  }
433  }
434 
435  ASSERT_ND(total_bytes == total_written + current_pos); // now we went over all logs again
436 
437  if (current_pos > 0) {
438  ASSERT_ND(current_pos < flush_threshold);
439  // for aligned write, add a dummy storage block at the end.
440  if (current_pos % (log::FillerLogType::kLogWriteUnitSize) != 0) {
441  uint64_t upto = assorted::align<uint64_t, log::FillerLogType::kLogWriteUnitSize>(current_pos);
442  ASSERT_ND(upto > current_pos);
443  ASSERT_ND(upto < current_pos + log::FillerLogType::kLogWriteUnitSize);
445  FillerBlockHeader* filler = reinterpret_cast<FillerBlockHeader*>(io_buffer + current_pos);
446  filler->block_length_ = to_buffer_position(upto - current_pos);
448  filler->magic_word_ = BlockHeaderBase::kFillerBlockHeaderMagicWord;
449  if (upto - current_pos > sizeof(FillerBlockHeader)) {
450  // fill it with zeros. not mandatory, but wouldn't hurt. it's just 4kb.
451  std::memset(
452  io_buffer + current_pos + sizeof(FillerBlockHeader),
453  0,
454  upto - current_pos - sizeof(FillerBlockHeader));
455  }
456  current_pos = upto;
457  }
458 
460  WRAP_ERROR_CODE(dump_file->write(current_pos, dump_io_buffer_));
461  total_written += current_pos;
462  }
463 
464  ASSERT_ND(total_written % log::FillerLogType::kLogWriteUnitSize == 0);
465  write_watch.stop();
466  LOG(INFO) << to_string() << " Wrote out storage-" << storage_id << " which had " << log_count
467  << " log records (" << total_written << " bytes) in "<< write_watch.elapsed_ms() << "ms";
468  return kRetOk;
469 }
470 
471 
472 void LogReducer::expand_if_needed(
473  uint64_t required_size,
474  memory::AlignedMemory *memory,
475  const std::string& name) {
476  if (memory->is_null() || memory->get_size() < required_size) {
477  if (memory->is_null()) {
478  LOG(INFO) << to_string() << " initially allocating " << name << "."
479  << assorted::Hex(required_size) << " bytes.";
480  } else {
481  LOG(WARNING) << to_string() << " automatically expanding " << name << " from "
482  << assorted::Hex(memory->get_size()) << " bytes to "
483  << assorted::Hex(required_size) << " bytes. if this happens often,"
484  << " our sizing is wrong.";
485  }
486  memory->alloc(
487  required_size,
490  get_numa_node());
491  }
492 }
493 void LogReducer::expand_positions_buffers_if_needed(uint64_t required_size_per_buffer) {
494  ASSERT_ND(input_positions_slice_.get_size() == output_positions_slice_.get_size());
495  if (input_positions_slice_.get_size() < required_size_per_buffer) {
496  uint64_t new_size = required_size_per_buffer * 2;
497  LOG(WARNING) << to_string() << " automatically expanding positions_buffers from "
498  << positions_buffers_.get_size() << " to " << new_size << ". if this happens often,"
499  << " our sizing is wrong.";
500  positions_buffers_.alloc(
501  new_size,
504  get_numa_node());
505  input_positions_slice_ = memory::AlignedMemorySlice(
506  &positions_buffers_,
507  0,
508  positions_buffers_.get_size() >> 1);
509  output_positions_slice_ = memory::AlignedMemorySlice(
510  &positions_buffers_,
511  positions_buffers_.get_size() >> 1,
512  positions_buffers_.get_size() >> 1);
513  }
514 }
515 
516 fs::Path LogReducer::get_sorted_run_file_path(uint32_t sorted_run) const {
517  // sorted_run_<snapshot id>_<node id>_<sorted run>.tmp is the file name
518  std::stringstream file_name;
519  file_name << "/sorted_run_"
520  << parent_.get_snapshot_id() << "_"
521  << static_cast<int>(get_numa_node()) << "_"
522  << static_cast<int>(sorted_run) << ".tmp";
524  path /= file_name.str();
525  return path;
526 }
527 
528 LogReducer::MergeContext::MergeContext(uint32_t dumped_files_count)
529  : dumped_files_count_(dumped_files_count),
530  tmp_sorted_buffer_array_(new SortedBuffer*[dumped_files_count + 1]),
531  tmp_sorted_buffer_count_(0) {
532 }
533 
534 LogReducer::MergeContext::~MergeContext() {
535  sorted_buffers_.clear();
536  // destructor calls close(), but to make sure
537  for (auto& file : sorted_files_auto_ptrs_) {
538  file->close();
539  }
540  sorted_files_auto_ptrs_.clear();
541  io_buffers_.clear();
542  io_memory_.release_block();
543  delete[] tmp_sorted_buffer_array_;
544  tmp_sorted_buffer_array_ = nullptr;
545 }
546 
547 storage::StorageId LogReducer::MergeContext::get_min_storage_id() const {
548  bool first = true;
549  storage::StorageId storage_id = 0;
550  for (uint32_t i = 0 ; i < sorted_buffers_.size(); ++i) {
551  storage::StorageId the_storage_id = sorted_buffers_[i]->get_cur_block_storage_id();
552  if (the_storage_id == 0) {
553  continue;
554  }
555  if (first) {
556  storage_id = the_storage_id;
557  first = false;
558  } else {
559  storage_id = std::min(storage_id, the_storage_id);
560  }
561  }
562  return storage_id;
563 }
564 
565 void LogReducer::MergeContext::set_tmp_sorted_buffer_array(storage::StorageId storage_id) {
566  tmp_sorted_buffer_count_ = 0;
567  for (uint32_t i = 0 ; i < sorted_buffers_.size(); ++i) {
568  if (sorted_buffers_[i]->get_cur_block_storage_id() == storage_id) {
569  tmp_sorted_buffer_array_[tmp_sorted_buffer_count_] = sorted_buffers_[i].get();
570  ++tmp_sorted_buffer_count_;
571  }
572  }
573  ASSERT_ND(tmp_sorted_buffer_count_ > 0);
574 }
575 
576 ErrorStack LogReducer::merge_sort() {
577  merge_sort_check_buffer_status();
578  CHECK_ERROR(merge_sort_dump_last_buffer());
579 
580  // The writer to writes out composed snapshot pages to a new snapshot file.
581  // we use it only during merge_sort().
582  SnapshotWriter snapshot_writer(
583  engine_,
584  numa_node_,
586  &writer_pool_memory_,
587  &writer_intermediate_memory_);
588  CHECK_ERROR(snapshot_writer.open());
589 
590  // because now we are at the last merging phase, we will no longer dump sorted runs any more.
591  // thus, we release the reducer's dump IO buffer to reduce memory pressure.
592  dump_io_buffer_.release_block();
593 
594  MergeContext context(sorted_runs_);
595  LOG(INFO) << to_string() << " merge sorting " << sorted_runs_ << " sorted runs and the current"
596  << " buffer which has "
597  << 8ULL * control_block_->get_current_buffer_status().get_tail_position()
598  << " bytes";
599  debugging::StopWatch merge_watch;
600 
601  // prepare the input streams for composers
602  merge_sort_allocate_io_buffers(&context);
603  CHECK_ERROR(merge_sort_open_sorted_runs(&context));
604  CHECK_ERROR(merge_sort_initialize_sort_buffers(&context));
605 
606  // this work memory automatically expands if needed
607  memory::AlignedMemory composer_work_memory;
608  composer_work_memory.alloc(
609  1U << 21,
610  1U << 12,
612  numa_node_);
613 
614  // merge-sort each storage
615  storage::StorageId prev_storage_id = 0;
616  control_block_->total_storage_count_ = 0;
617  for (storage::StorageId storage_id = context.get_min_storage_id();
618  storage_id > 0;
619  storage_id = context.get_min_storage_id(), ++control_block_->total_storage_count_) {
620  if (storage_id <= prev_storage_id) {
621  LOG(FATAL) << to_string() << " wtf. not storage sorted? " << *this;
622  }
623  prev_storage_id = storage_id;
624 
625  // collect streams for this storage
626  VLOG(0) << to_string() << " merging storage-" << storage_id << ", num="
627  << control_block_->total_storage_count_;
628  context.set_tmp_sorted_buffer_array(storage_id);
629 
630  // run composer
631  storage::Composer composer(engine_, storage_id);
632  // snapshot_reader_.get_or_open_file();
633  ASSERT_ND(control_block_->total_storage_count_ <= get_max_storage_count());
634  storage::Page* root_info_page = root_info_pages_ + control_block_->total_storage_count_;
635  storage::Composer::ComposeArguments args = {
636  &snapshot_writer,
637  &previous_snapshot_files_,
638  context.tmp_sorted_buffer_array_,
639  context.tmp_sorted_buffer_count_,
640  &composer_work_memory,
642  root_info_page};
643  CHECK_ERROR(composer.compose(args));
644 
645  // move on to next blocks
646  for (auto& ptr : context.sorted_buffers_) {
647  SortedBuffer *buffer = ptr.get();
648  WRAP_ERROR_CODE(merge_sort_advance_sort_buffers(buffer, storage_id));
649  }
650  }
651  ASSERT_ND(control_block_->total_storage_count_ <= get_max_storage_count());
652 
653  snapshot_writer.close();
654  merge_watch.stop();
655  LOG(INFO) << to_string() << " completed merging in " << merge_watch.elapsed_sec() << " seconds"
656  << " . total_storage_count_=" << control_block_->total_storage_count_;
657  return kRetOk;
658 }
659 
660 
661 void LogReducer::merge_sort_check_buffer_status() const {
662  ASSERT_ND(sorted_runs_ == control_block_->current_buffer_);
664  <= buffer_half_size_bytes_);
666  <= buffer_half_size_bytes_);
667  if (control_block_->get_non_current_buffer_status().components.tail_position_ > 0 ||
669  LOG(FATAL) << to_string() << " non-current buffer still has some data. this must not happen"
670  << " at merge_sort step.";
671  }
672  ReducerBufferStatus cur_status = control_block_->get_current_buffer_status();
673  if (cur_status.components.active_writers_ > 0) {
674  LOG(FATAL) << to_string() << " last buffer is still being written. this must not happen"
675  << " at merge_sort step.";
676  }
677  ASSERT_ND(!cur_status.is_no_more_writers()); // it should be still active
678 }
679 
680 void LogReducer::merge_sort_allocate_io_buffers(LogReducer::MergeContext* context) const {
681  if (context->dumped_files_count_ == 0) {
682  LOG(INFO) << to_string() << " great, no sorted run files. everything in-memory";
683  return;
684  }
685  debugging::StopWatch alloc_watch;
686  uint64_t size_per_run =
687  static_cast<uint64_t>(engine_->get_options().snapshot_.log_reducer_read_io_buffer_kb_) << 10;
688  uint64_t size_total = size_per_run * context->dumped_files_count_;
689  context->io_memory_.alloc(
690  size_total,
693  get_numa_node());
694  for (uint32_t i = 0; i < context->dumped_files_count_; ++i) {
695  context->io_buffers_.emplace_back(memory::AlignedMemorySlice(
696  &context->io_memory_,
697  i * size_per_run,
698  size_per_run));
699  }
700  alloc_watch.stop();
701  LOG(INFO) << to_string() << " allocated IO buffers (" << size_total << " bytes in total) "
702  << " in " << alloc_watch.elapsed_us() << "us";
703 }
704 ErrorStack LogReducer::merge_sort_dump_last_buffer() {
705  uint16_t last = control_block_->current_buffer_ % 2;
706  BufferPosition last_pos = control_block_->get_buffer_status_atomic(last).get_tail_position();
707  LOG(INFO) << to_string() << " sorting the last buffer in memory (" << last_pos * 8 << "B)...";
708  debugging::StopWatch watch;
709 
710  // Reuse most of dump_buffer_xxx methods
711  char* const base = reinterpret_cast<char*>(buffers_[last]);
712  char* const other = reinterpret_cast<char*>(buffers_[(last + 1) % 2]);
713  std::map<storage::StorageId, std::vector<BufferPosition> > blocks;
714  dump_buffer_scan_block_headers(base, last_pos, &blocks);
715  uint64_t other_bytes = 0;
716  uint64_t total_log_count = 0;
717  for (auto& kv : blocks) {
718  LogBuffer buffer(base);
719  storage::StorageId storage_id = kv.first;
720  uint32_t shortest_key_length;
721  uint32_t longest_key_length;
722  uint32_t count;
723  CHECK_ERROR(dump_buffer_sort_storage(
724  buffer,
725  storage_id,
726  kv.second,
727  &shortest_key_length,
728  &longest_key_length,
729  &count));
730  total_log_count += count;
731  BufferPosition* pos = reinterpret_cast<BufferPosition*>(output_positions_slice_.get_block());
732 
733  // The only difference is here. Output the sorted result to the other buffer, not to file.
734  uint64_t total_bytes = dump_block_header(
735  buffer,
736  storage_id,
737  pos,
738  shortest_key_length,
739  longest_key_length,
740  count,
741  other + other_bytes);
742  uint64_t current_pos = sizeof(FullBlockHeader) + other_bytes;
743  for (uint32_t i = 0; i < count; ++i) {
744  const log::RecordLogType* record = buffer.resolve(pos[i]);
745  ASSERT_ND(current_pos % 8 == 0);
746  ASSERT_ND(record->header_.storage_id_ == storage_id);
747  ASSERT_ND(record->header_.log_length_ > 0);
748  ASSERT_ND(record->header_.log_length_ % 8 == 0);
749  std::memcpy(other + current_pos, record, record->header_.log_length_);
750  current_pos += record->header_.log_length_;
751  }
752  ASSERT_ND(total_bytes + other_bytes == current_pos); // now we went over all logs again
753  other_bytes += total_bytes;
754  }
755 
756  // We wrote out to the other buffer, switch the current.
757  ++control_block_->current_buffer_;
758  ReducerBufferStatus other_status;
759  other_status.components.active_writers_ = 0;
760  other_status.components.flags_ = kFlagNoMoreWriters;
761  other_status.components.tail_position_ = to_buffer_position(other_bytes);
762  control_block_->buffer_status_[control_block_->current_buffer_ % 2] = other_status.word;
763  watch.stop();
764  LOG(INFO) << to_string() << " sorted the last buffer in memory (" << last_pos * 8 << "B -> "
765  << other_bytes << "B) in " << watch.elapsed_ms() << "ms, total_log_count=" << total_log_count;
766  return kRetOk;
767 }
768 
769 ErrorStack LogReducer::merge_sort_open_sorted_runs(LogReducer::MergeContext* context) const {
770  uint32_t last_buffer_index = control_block_->current_buffer_;
771  ReducerBufferStatus buffer_status = control_block_->get_buffer_status_atomic(last_buffer_index);
772  void* last_buffer = buffers_[last_buffer_index % 2];
773  // always the last buffer (no cost)
774  context->sorted_buffers_.emplace_back(new InMemorySortedBuffer(
775  reinterpret_cast<char*>(last_buffer),
776  from_buffer_position(buffer_status.components.tail_position_)));
777 
778  // sorted run files
779  ASSERT_ND(context->io_buffers_.size() == sorted_runs_);
780  for (uint32_t sorted_run = 0 ; sorted_run < context->dumped_files_count_; ++sorted_run) {
781  fs::Path path = get_sorted_run_file_path(sorted_run);
782  if (!fs::exists(path)) {
783  LOG(FATAL) << to_string() << " wtf. this sorted run file doesn't exist " << path;
784  }
785  uint64_t file_size = fs::file_size(path);
786  if (file_size == 0) {
787  LOG(FATAL) << to_string() << " wtf. this sorted run file is empty " << path;
788  }
789 
790  std::unique_ptr<fs::DirectIoFile> file_ptr(new fs::DirectIoFile(
791  path,
793  WRAP_ERROR_CODE(file_ptr->open(true, false, false, false));
794 
795  context->sorted_buffers_.emplace_back(new DumpFileSortedBuffer(
796  file_ptr.get(),
797  context->io_buffers_[sorted_run]));
798  context->sorted_files_auto_ptrs_.emplace_back(std::move(file_ptr));
799  }
800 
801  ASSERT_ND(context->sorted_files_auto_ptrs_.size() == context->sorted_buffers_.size() - 1U);
802  ASSERT_ND(context->dumped_files_count_ == context->sorted_buffers_.size() - 1U);
803  return kRetOk;
804 }
805 
806 ErrorStack LogReducer::merge_sort_initialize_sort_buffers(LogReducer::MergeContext* context) const {
807  for (uint32_t index = 0 ; index < context->sorted_buffers_.size(); ++index) {
808  SortedBuffer* buffer = context->sorted_buffers_[index].get();
809  if (buffer->get_total_size() == 0) {
810  buffer->invalidate_current_block();
811  LOG(INFO) << to_string() << " buffer-" << index << " is empty";
812  continue;
813  }
814  if (index > 0) {
815  DumpFileSortedBuffer* casted = dynamic_cast<DumpFileSortedBuffer*>(buffer);
816  ASSERT_ND(casted);
817  // the buffer hasn't loaded any data, so let's make the first read.
818  uint64_t desired_reads = std::min(casted->get_buffer_size(), casted->get_total_size());
819  WRAP_ERROR_CODE(casted->get_file()->read(desired_reads, casted->get_io_buffer()));
820  } else {
821  ASSERT_ND(dynamic_cast<InMemorySortedBuffer*>(buffer));
822  // in-memory one has already loaded everything
823  }
824 
825  // See the first block header. As dummy block always follows a real block, this must be
826  // a real block.
827  const FullBlockHeader* header = reinterpret_cast<const FullBlockHeader*>(
828  buffer->get_buffer());
829  if (!header->is_full_block()) {
830  LOG(FATAL) << to_string() << " wtf. first block in the file is not a real storage block."
831  << *buffer << *header;
832  }
833  buffer->set_current_block(
834  header->storage_id_,
835  header->log_count_,
836  sizeof(FullBlockHeader),
837  from_buffer_position(header->block_length_),
838  header->shortest_key_length_,
839  header->longest_key_length_);
840  }
841 
842  return kRetOk;
843 }
844 
845 
846 ErrorCode LogReducer::merge_sort_advance_sort_buffers(
847  SortedBuffer* buffer,
848  storage::StorageId processed_storage_id) const {
849  if (buffer->get_cur_block_storage_id() != processed_storage_id) {
850  return kErrorCodeOk;
851  }
852  uint64_t next_block_header_pos = buffer->get_cur_block_abosulte_end();
853  uint64_t in_buffer_pos = buffer->to_relative_pos(next_block_header_pos);
854  const BlockHeaderBase* next_header
855  = reinterpret_cast<const BlockHeaderBase*>(buffer->get_buffer() + in_buffer_pos);
856 
857  // skip a dummy block
858  if (next_block_header_pos < buffer->get_total_size() && next_header->is_filler()) {
859  // next block is a dummy block. we have to skip over it.
860  // no two filler blocks come in a row, so just skip this one.
861  uint64_t skip_bytes = from_buffer_position(next_header->block_length_);
862  next_block_header_pos += skip_bytes;
863  VLOG(1) << to_string() << " skipped a filler block. " << skip_bytes << " bytes";
864  if (next_block_header_pos + sizeof(FullBlockHeader)
865  > buffer->get_offset() + buffer->get_buffer_size()) {
866  // we have to at least read the header of next block. if we unluckily hit
867  // the boundary here, wind it.
868  LOG(INFO) << to_string() << " wow, we unluckily hit buffer boundary while skipping"
869  << " a filler block. it's rare!";
870  CHECK_ERROR_CODE(buffer->wind(next_block_header_pos));
871  ASSERT_ND(next_block_header_pos >= buffer->get_offset());
872  ASSERT_ND(next_block_header_pos + sizeof(FullBlockHeader)
873  <= buffer->get_offset() + buffer->get_buffer_size());
874  }
875 
876  in_buffer_pos = buffer->to_relative_pos(next_block_header_pos);
877  next_header =
878  reinterpret_cast<const BlockHeaderBase*>(buffer->get_buffer() + in_buffer_pos);
879  }
880 
881  // next block must be a real block. but, did we reach end of file?
882  if (next_block_header_pos >= buffer->get_total_size()) {
883  ASSERT_ND(next_block_header_pos == buffer->get_total_size());
884  // this stream is done
885  buffer->invalidate_current_block();
886  LOG(INFO) << to_string() << " fully merged a stream: " << *buffer;
887  } else {
888  if (!next_header->is_full_block()) {
889  LOG(FATAL) << to_string() << " wtf. block magic word doesn't match. pos="
890  << next_block_header_pos << ", " << *next_header;
891  }
892 
893  const FullBlockHeader* next_header_casted
894  = reinterpret_cast<const FullBlockHeader*>(next_header);
895  if (next_header_casted->storage_id_ == 0 ||
896  next_header_casted->log_count_ == 0 ||
897  next_header_casted->block_length_ == 0) {
898  LOG(FATAL) << to_string() << " wtf. invalid block header. pos="
899  << next_block_header_pos << *next_header_casted;
900  }
901  buffer->set_current_block(
902  next_header_casted->storage_id_,
903  next_header_casted->log_count_,
904  next_block_header_pos + sizeof(FullBlockHeader),
905  next_block_header_pos + from_buffer_position(next_header_casted->block_length_),
906  next_header_casted->shortest_key_length_,
907  next_header_casted->longest_key_length_);
908  }
909  return kErrorCodeOk;
910 }
911 
912 std::ostream& operator<<(std::ostream& o, const LogReducer& v) {
913  o << "<LogReducer>"
914  << "<id_>" << v.get_id() << "</id_>"
915  << "<numa_node>" << v.get_numa_node() << "</numa_node>"
916  << "<total_storage_count>" << v.control_block_->total_storage_count_ << "</total_storage_count>"
917  << "<sort_buffer_>" << v.sort_buffer_ << "</sort_buffer_>"
918  << "<positions_buffers_>" << v.positions_buffers_ << "</positions_buffers_>"
919  << "<current_buffer_>" << v.control_block_->current_buffer_ << "</current_buffer_>"
920  << "<sorted_runs_>" << v.sorted_runs_ << "</sorted_runs_>"
921  << "</LogReducer>";
922  return o;
923 }
924 
925 std::ostream& operator<<(std::ostream& o, const BlockHeaderBase& v) {
926  o << "<BlockHeader>"
927  << "<magic_word_>" << assorted::Hex(v.magic_word_) << "</magic_word_>"
928  << "<block_length_>" << v.block_length_ << "</block_length_>"
929  << "</BlockHeader>";
930  return o;
931 }
932 
933 uint32_t LogReducer::get_max_storage_count() const {
935 }
936 
937 
938 } // namespace snapshot
939 } // namespace foedus
ErrorStack initialize_once() override
numa_alloc_onnode() and numa_free().
BufferPosition to_buffer_position(uint64_t byte_position)
Definition: snapshot_id.hpp:74
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
uint32_t snapshot_writer_page_pool_size_mb_
The size in MB of one snapshot writer, which holds data pages modified in the snapshot and them seque...
void release_block()
Releases the memory block.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ReducerBufferStatus get_current_buffer_status() const
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower data device.
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
struct foedus::snapshot::ReducerBufferStatus::Components components
Same as GlobalMemoryAnchors except this is for node_memories_.
A bit-wise flag in ReducerBufferStatus's flags_.
Definitions of IDs in this package and a few related constant values.
Declares common log types used in all packages.
ErrorStack handle_process() override
Implements the specific logics in derived class.
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint32_t max_storages_
Maximum number of storages in this database.
ReducerBufferStatus get_non_current_buffer_status() const
std::atomic< uint64_t > buffer_status_[2]
Status of the two reducer buffers.
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
ErrorStack uninitialize_once() override
const uint64_t kHugepageSize
So far 2MB is the only page size available via Transparent Huge Page (THP).
Definition: memory_id.hpp:50
uint64_t from_buffer_position(BufferPosition buffer_position)
Definition: snapshot_id.hpp:78
Batches zero or more ErrorStack objects to represent in one ErrorStack.
uint32_t log_reducer_read_io_buffer_kb_
The size in KB of a buffer in reducer to read one temporary file.
0 means no-error.
Definition: error_code.hpp:87
Analogue of boost::filesystem::path.
Definition: path.hpp:37
BufferPosition get_tail_position() const
storage::StorageOptions storage_
snapshot::SnapshotOptions snapshot_
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
All log blocks in mapper/reducers start with this header.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
SnapshotId get_snapshot_id() const
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
BufferPosition block_length_
Length (in 8-bytes) of this block including the header.
uint32_t magic_word_
This is used to identify the storage block is a dummy (filler) one or a full one. ...
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
snapshot::LogReducerControlBlock * log_reducer_memory_
Tiny control memory for LogReducer in this node.
A slice of foedus::memory::AlignedMemory.
void * get_block() const
Returns the memory block.
void * log_reducer_buffers_[2]
Actual buffers for LogReducer.
Set of options for snapshot manager.
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
uint64_t get_size() const
Returns the byte size of the memory block.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
uint32_t log_reducer_buffer_mb_
The size in MB of a buffer to store log entries in reducer (partition).
storage::Page * log_reducer_root_info_pages_
This is the 'output' of the reducer in this node.
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Represents an I/O stream on one file without filesystem caching.
const ErrorStack kRetOk
Normal return value for no-error case.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
Convenient way of writing hex integers to stream.
Base class for LogMapper and LogReducer to share common code.
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint32_t log_reducer_dump_io_buffer_mb_
The size in MB of a buffer to write out sorted log entries in reducer to a temporary file...
A high-resolution stop watch.
Definition: stop_watch.hpp:30
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
uint32_t snapshot_writer_intermediate_pool_size_mb_
The size in MB of additional page pool for one snapshot writer just for holding intermediate pages...
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
std::string convert_folder_path_pattern(int node) const
converts folder_path_pattern_ into a string with the given node.
uint16_t id_
ID of this reducer (or numa node ID).
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
Definition: log_buffer.cpp:32
We always write to file in a multiply of this value, filling up the rest if needed.
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
bool is_null() const
Returns if this object doesn't hold a valid memory block.