libfoedus-core
FOEDUS Core Library
log_mapper_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <ostream>
24 #include <string>
25 
26 #include "foedus/assert_nd.hpp"
27 #include "foedus/engine.hpp"
29 #include "foedus/epoch.hpp"
34 #include "foedus/fs/filesystem.hpp"
36 #include "foedus/log/log_type.hpp"
46 
47 namespace foedus {
48 namespace snapshot {
49 
54 uint16_t calculate_logger_id(Engine* engine, uint16_t local_ordinal) {
55  return engine->get_options().log_.loggers_per_node_ * engine->get_soc_id() + local_ordinal;
56 }
57 
58 LogMapper::LogMapper(Engine* engine, uint16_t local_ordinal)
59  : MapReduceBase(engine, calculate_logger_id(engine, local_ordinal)),
60  processed_log_count_(0) {
61  clear_storage_buckets();
62 }
63 
65  const SnapshotOptions& option = engine_->get_options().snapshot_;
66 
67  uint64_t io_buffer_size = static_cast<uint64_t>(option.log_mapper_io_buffer_mb_) << 20;
68  io_buffer_size = assorted::align<uint64_t, memory::kHugepageSize>(io_buffer_size);
69  io_buffer_.alloc(
70  io_buffer_size,
73  numa_node_);
74  ASSERT_ND(!io_buffer_.is_null());
75 
76  uint64_t bucket_size = static_cast<uint64_t>(option.log_mapper_bucket_kb_) << 10;
77  buckets_memory_.alloc(
78  bucket_size,
81  numa_node_);
82  ASSERT_ND(!buckets_memory_.is_null());
83 
84  const uint64_t tmp_memory_size = memory::kHugepageSize;
85  tmp_memory_.alloc(
86  tmp_memory_size,
89  numa_node_);
90  ASSERT_ND(!tmp_memory_.is_null());
91 
92  // these automatically expand
93  presort_buffer_.alloc(
94  1U << 21,
97  numa_node_);
98  presort_ouputs_.alloc(
99  1U << 21,
102  numa_node_);
103 
104  uint64_t tmp_offset = 0;
105  tmp_send_buffer_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kSendBufferSize);
106  tmp_offset += kSendBufferSize;
107  tmp_position_array_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kBucketSize);
108  tmp_offset += kBucketSize;
109  tmp_sort_array_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kBucketSize << 1);
110  tmp_offset += kBucketSize << 1;
111  const uint64_t hashlist_bytesize = kBucketHashListMaxCount * sizeof(BucketHashList);
112  tmp_hashlist_buffer_slice_ = memory::AlignedMemorySlice(
113  &tmp_memory_, tmp_offset, hashlist_bytesize);
114  tmp_offset += hashlist_bytesize;
115  tmp_partition_array_slice_ = memory::AlignedMemorySlice(&tmp_memory_, tmp_offset, kBucketSize);
116  tmp_offset += kBucketSize;
117  if (tmp_memory_size < tmp_offset) {
118  LOG(FATAL) << "tmp_memory_size is too small. some contant values are messed up";
119  }
120 
121  processed_log_count_ = 0;
122  clear_storage_buckets();
123 
124  return kRetOk;
125 }
126 
128  ErrorStackBatch batch;
129  io_buffer_.release_block();
130  buckets_memory_.release_block();
131  tmp_memory_.release_block();
132  clear_storage_buckets();
133  return SUMMARIZE_ERROR_BATCH(batch);
134 }
135 
136 const uint64_t kIoAlignment = 0x1000;
137 uint64_t align_io_floor(uint64_t offset) { return (offset / kIoAlignment) * kIoAlignment; }
138 uint64_t align_io_ceil(uint64_t offset) { return align_io_floor(offset + kIoAlignment - 1U); }
139 
141  const Epoch base_epoch = parent_.get_base_epoch();
142  const Epoch until_epoch = parent_.get_valid_until_epoch();
144  const log::LogRange log_range = logger.get_log_range(base_epoch, until_epoch);
145  // uint64_t cur_offset = log_range.begin_offset;
146  if (log_range.is_empty()) {
147  LOG(INFO) << to_string() << " has no logs to process";
148  report_completion(0);
149  return kRetOk;
150  }
151 
152  // open the file and seek to there. be careful on page boundary.
153  // as we use direct I/O, all I/O must be 4kb-aligned. when the read range is not
154  // a multiply of 4kb, we read a little bit more (at most 4kb per read, so negligible).
155  // to clarify, here we use the following suffixes
156  // "infile"/"inbuf" : the offset is an offset in entire file/IO buffer
157  // "aligned" : the offset is 4kb-aligned (careful on floor vs ceil)
158  // Lengthy, but otherwise it's so confusing.
159  processed_log_count_ = 0;
160  IoBufStatus status;
161  status.size_inbuf_aligned_ = io_buffer_.get_size();
162  status.cur_file_ordinal_ = log_range.begin_file_ordinal;
163  status.ended_ = false;
164  status.first_read_ = true;
165  debugging::StopWatch watch;
166  while (!status.ended_) { // loop for log file switch
168  numa_node_,
169  id_,
170  status.cur_file_ordinal_));
171  uint64_t file_size = fs::file_size(path);
172  if (file_size % kIoAlignment != 0) {
173  LOG(WARNING) << to_string() << " Interesting, non-aligned file size, which probably means"
174  << " previous writes didn't flush. file path=" << path << ", file size=" << file_size;
175  file_size = align_io_floor(file_size);
176  }
177  ASSERT_ND(file_size % kIoAlignment == 0);
178  status.size_infile_aligned_ = file_size;
179 
180  // If this is the first file to read, we might be reading from non-zero position.
181  // In that case, be careful on alignment.
182  if (status.cur_file_ordinal_ == log_range.begin_file_ordinal) {
183  status.next_infile_ = log_range.begin_offset;
184  } else {
185  status.next_infile_ = 0;
186  }
187 
188  if (status.cur_file_ordinal_ == log_range.end_file_ordinal) {
189  ASSERT_ND(log_range.end_offset <= file_size);
190  status.end_infile_ = log_range.end_offset;
191  } else {
192  status.end_infile_ = file_size;
193  }
194 
195  DVLOG(1) << to_string() << " file path=" << path << ", file size=" << assorted::Hex(file_size)
196  << ", read_end=" << assorted::Hex(status.end_infile_);
198  WRAP_ERROR_CODE(file.open(true, false, false, false));
199  DVLOG(1) << to_string() << "opened log file " << file;
200 
201  while (true) {
202  WRAP_ERROR_CODE(check_cancelled()); // check per each read
203  status.buf_infile_aligned_ = align_io_floor(status.next_infile_);
204  WRAP_ERROR_CODE(file.seek(status.buf_infile_aligned_, fs::DirectIoFile::kDirectIoSeekSet));
205  DVLOG(1) << to_string() << " seeked to: " << assorted::Hex(status.buf_infile_aligned_);
206  status.end_inbuf_aligned_ = std::min(
207  io_buffer_.get_size(),
208  align_io_ceil(status.end_infile_ - status.buf_infile_aligned_));
209  ASSERT_ND(status.end_inbuf_aligned_ % kIoAlignment == 0);
210  WRAP_ERROR_CODE(file.read(status.end_inbuf_aligned_, &io_buffer_));
211 
212  status.cur_inbuf_ = 0;
213  if (status.next_infile_ != status.buf_infile_aligned_) {
214  ASSERT_ND(status.next_infile_ > status.buf_infile_aligned_);
215  status.cur_inbuf_ = status.next_infile_ - status.buf_infile_aligned_;
216  status.cur_inbuf_ = status.next_infile_ - status.buf_infile_aligned_;
217  DVLOG(1) << to_string() << " skipped " << status.cur_inbuf_ << " bytes for aligned read";
218  }
219 
220  CHECK_ERROR(handle_process_buffer(file, &status));
221  if (status.more_in_the_file_) {
222  ASSERT_ND(status.next_infile_ > status.buf_infile_aligned_);
223  } else {
224  if (log_range.end_file_ordinal == status.cur_file_ordinal_) {
225  status.ended_ = true;
226  break;
227  } else {
228  ++status.cur_file_ordinal_;
229  status.next_infile_ = 0;
230  LOG(INFO) << to_string()
231  << " moved on to next log file ordinal " << status.cur_file_ordinal_;
232  }
233  }
234  }
235  file.close();
236  }
237  watch.stop();
238  LOG(INFO) << to_string() << " processed " << processed_log_count_ << " log entries in "
239  << watch.elapsed_sec() << "s";
240  report_completion(watch.elapsed_sec());
241  return kRetOk;
242 }
243 void LogMapper::report_completion(double elapsed_sec) {
244  uint16_t value_after = parent_.increment_completed_mapper_count();
245  if (value_after == parent_.get_mappers_count()) {
246  LOG(INFO) << "All mappers done. " << to_string() << " was the last mapper. took "
247  << elapsed_sec << "s";
248  }
249 }
250 
251 ErrorStack LogMapper::handle_process_buffer(const fs::DirectIoFile &file, IoBufStatus* status) {
252  const Epoch base_epoch = parent_.get_base_epoch(); // only for assertions
253  const Epoch until_epoch = parent_.get_valid_until_epoch(); // only for assertions
254 
255  // many temporary memory are used only within this method and completely cleared out
256  // for every call.
257  clear_storage_buckets();
258 
259  char* buffer = reinterpret_cast<char*>(io_buffer_.get_block());
260  status->more_in_the_file_ = false;
261  for (; status->cur_inbuf_ < status->end_inbuf_aligned_; ++processed_log_count_) {
262  // Note: The loop here must be a VERY tight loop, iterated over every single log entry!
263  // In most cases, we should be just calling bucket_log().
264  const log::LogHeader* header
265  = reinterpret_cast<const log::LogHeader*>(buffer + status->cur_inbuf_);
266  ASSERT_ND(header->log_length_ > 0);
267  ASSERT_ND(status->buf_infile_aligned_ != 0 || status->cur_inbuf_ != 0
268  || header->get_type() == log::kLogCodeEpochMarker); // file starts with marker
269  // we must be starting from epoch marker.
270  ASSERT_ND(!status->first_read_ || header->get_type() == log::kLogCodeEpochMarker);
271  ASSERT_ND(header->get_kind() == log::kRecordLogs
272  || header->get_type() == log::kLogCodeEpochMarker
273  || header->get_type() == log::kLogCodeFiller);
274 
275  if (UNLIKELY(header->log_length_ + status->cur_inbuf_ > status->end_inbuf_aligned_)) {
276  // if a log goes beyond this read, stop processing here and read from that offset again.
277  // this is simpler than glue-ing the fragment. This happens just once per 64MB read,
278  // so not a big waste.
279  if (status->to_infile(status->cur_inbuf_ + header->log_length_)
280  > status->size_infile_aligned_) {
281  // but it never spans two files. something is wrong.
282  LOG(ERROR) << "inconsistent end of log entry. offset="
283  << status->to_infile(status->cur_inbuf_)
284  << ", file=" << file << ", log header=" << *header;
285  return ERROR_STACK_MSG(kErrorCodeSnapshotInvalidLogEnd, file.get_path().c_str());
286  }
287  status->next_infile_ = status->to_infile(status->cur_inbuf_);
288  status->more_in_the_file_ = true;
289  break;
290  } else if (UNLIKELY(header->get_type() == log::kLogCodeEpochMarker)) {
291  // skip epoch marker
292  const log::EpochMarkerLogType *marker =
293  reinterpret_cast<const log::EpochMarkerLogType*>(header);
294  ASSERT_ND(header->log_length_ == sizeof(log::EpochMarkerLogType));
295  ASSERT_ND(marker->log_file_ordinal_ == status->cur_file_ordinal_);
296  ASSERT_ND(marker->log_file_offset_ == status->to_infile(status->cur_inbuf_));
297  ASSERT_ND(marker->new_epoch_ >= marker->old_epoch_);
298  ASSERT_ND(!base_epoch.is_valid() || marker->new_epoch_ >= base_epoch);
299  ASSERT_ND(marker->new_epoch_ <= until_epoch);
300  if (status->first_read_) {
301  ASSERT_ND(!base_epoch.is_valid()
302  || marker->old_epoch_ <= base_epoch // otherwise we skipped some logs
303  || marker->old_epoch_ == marker->new_epoch_); // the first marker (old==new) is ok
304  status->first_read_ = false;
305  } else {
306  ASSERT_ND(!base_epoch.is_valid() || marker->old_epoch_ >= base_epoch);
307  }
308  } else if (UNLIKELY(header->get_type() == log::kLogCodeFiller)) {
309  // skip filler log
310  } else {
311  bool bucketed = bucket_log(header->storage_id_, status->cur_inbuf_);
312  if (UNLIKELY(!bucketed)) {
313  // need to add a new bucket
314  bool added = add_new_bucket(header->storage_id_);
315  if (added) {
316  bucketed = bucket_log(header->storage_id_, status->cur_inbuf_);
317  ASSERT_ND(bucketed);
318  } else {
319  // runs out of bucket_memory. have to flush now.
320  flush_all_buckets();
321  added = add_new_bucket(header->storage_id_);
322  ASSERT_ND(added);
323  bucketed = bucket_log(header->storage_id_, status->cur_inbuf_);
324  ASSERT_ND(bucketed);
325  }
326  }
327  }
328 
329  status->cur_inbuf_ += header->log_length_;
330  }
331 
332  // This fixes Bug #100. When a full mapper buffer exactly ends with a complete log,
333  // we must keep reading. Didn't
334  if (status->cur_inbuf_ == status->end_inbuf_aligned_
335  && status->end_infile_ > status->to_infile(status->cur_inbuf_)) {
336  LOG(INFO) << "Hooray, a full mapper buffer exactly ends with a complete log record. rare!";
337  status->next_infile_ = status->to_infile(status->cur_inbuf_);
338  status->more_in_the_file_ = true;
339  }
340 
341  // bucktized all logs. now let's send them out to reducers
342  flush_all_buckets();
343  return kRetOk;
344 }
345 
346 inline bool LogMapper::bucket_log(storage::StorageId storage_id, uint64_t pos) {
347  BucketHashList* hashlist = find_storage_hashlist(storage_id);
348  if (UNLIKELY(hashlist == nullptr)) {
349  return false;
350  }
351 
352  if (LIKELY(!hashlist->tail_->is_full())) {
353  // 99.99% cases we are hitting here straight out of the tight loop.
354  // hope the hints guide the compiler well.
355  BufferPosition log_position = to_buffer_position(pos);
356  hashlist->tail_->log_positions_[hashlist->tail_->counts_] = log_position;
357  ++hashlist->tail_->counts_;
358  return true;
359  } else {
360  // found it, but we need to add a new bucket for this storage.
361  return false;
362  }
363 }
364 
365 bool LogMapper::add_new_bucket(storage::StorageId storage_id) {
366  if (buckets_allocated_count_ >= buckets_memory_.get_size() / sizeof(Bucket)) {
367  // we allocated all buckets_memory_! we have to flush the buckets now.
368  // this shouldn't happen often.
369  LOG(WARNING) << to_string() << " ran out of buckets_memory_, so it has to flush buckets before"
370  " processing one IO buffer. This shouldn't happen often. check your log_mapper_bucket_kb_"
371  " setting. this=" << *this;
372  return false;
373  }
374 
375  Bucket* base_address = reinterpret_cast<Bucket*>(buckets_memory_.get_block());
376  Bucket* new_bucket = base_address + buckets_allocated_count_;
377  ++buckets_allocated_count_;
378  new_bucket->storage_id_ = storage_id;
379  new_bucket->counts_ = 0;
380  new_bucket->next_bucket_ = nullptr;
381 
382  BucketHashList* hashlist = find_storage_hashlist(storage_id);
383  if (hashlist) {
384  // just add this as a new tail
385  ASSERT_ND(hashlist->storage_id_ == storage_id);
386  ASSERT_ND(hashlist->tail_->is_full());
387  hashlist->tail_->next_bucket_ = new_bucket;
388  hashlist->tail_ = new_bucket;
389  ++hashlist->bucket_counts_;
390  } else {
391  // we don't even have a linked list for this.
392  // If this happens often, maybe we should have 65536 hash buckets...
393  if (hashlist_allocated_count_ >= kBucketHashListMaxCount) {
394  LOG(WARNING) << to_string() << " ran out of hashlist memory, so it has to flush buckets now"
395  " This shouldn't happen often. We must consider increasing kBucketHashListMaxCount."
396  " this=" << *this;
397  return false;
398  }
399 
400  // allocate from the pool
401  BucketHashList* new_hashlist =
402  reinterpret_cast<BucketHashList*>(tmp_hashlist_buffer_slice_.get_block())
403  + hashlist_allocated_count_;
404  ++hashlist_allocated_count_;
405 
406  new_hashlist->storage_id_ = storage_id;
407  new_hashlist->head_ = new_bucket;
408  new_hashlist->tail_ = new_bucket;
409  new_hashlist->bucket_counts_ = 1;
410 
411  add_storage_hashlist(new_hashlist);
412  }
413  return true;
414 }
415 void LogMapper::add_storage_hashlist(BucketHashList* new_hashlist) {
416  ASSERT_ND(new_hashlist);
417  uint8_t index = static_cast<uint8_t>(new_hashlist->storage_id_);
418  if (storage_hashlists_[index] == nullptr) {
419  storage_hashlists_[index] = new_hashlist;
420  new_hashlist->hashlist_next_ = nullptr;
421  } else {
422  new_hashlist->hashlist_next_ = storage_hashlists_[index];
423  storage_hashlists_[index] = new_hashlist;
424  }
425 }
426 
427 void LogMapper::clear_storage_buckets() {
428  std::memset(storage_hashlists_, 0, sizeof(storage_hashlists_));
429  buckets_allocated_count_ = 0;
430  hashlist_allocated_count_ = 0;
431 }
432 
433 void LogMapper::flush_all_buckets() {
434  for (uint16_t i = 0; i < 256; ++i) {
435  for (BucketHashList* hashlist = storage_hashlists_[i];
436  hashlist != nullptr && hashlist->storage_id_ != 0;
437  hashlist = hashlist->hashlist_next_) {
438  flush_bucket(*hashlist);
439  }
440  }
441  clear_storage_buckets();
442 }
443 
444 void LogMapper::flush_bucket(const BucketHashList& hashlist) {
445  ASSERT_ND(hashlist.head_);
446  ASSERT_ND(hashlist.tail_);
447  // temporary variables to store partitioning results
448  BufferPosition* position_array = reinterpret_cast<BufferPosition*>(
449  tmp_position_array_slice_.get_block());
450  PartitionSortEntry* sort_array = reinterpret_cast<PartitionSortEntry*>(
451  tmp_sort_array_slice_.get_block());
452  storage::PartitionId* partition_array = reinterpret_cast<storage::PartitionId*>(
453  tmp_partition_array_slice_.get_block());
454  LogBuffer log_buffer(reinterpret_cast<char*>(io_buffer_.get_block()));
455  const bool multi_partitions = engine_->get_options().thread_.group_count_ > 1U;
456 
457  if (!engine_->get_storage_manager()->get_storage(hashlist.storage_id_)->exists()) {
458  // We ignore such logs in snapshot. As DROP STORAGE immediately becomes durable,
459  // There is no point to collect logs for the storage.
460  LOG(INFO) << "These logs are sent to a dropped storage.. ignore them";
461  return;
462  }
463 
464  uint64_t log_count = 0; // just for reporting
465  debugging::StopWatch stop_watch;
466  for (Bucket* bucket = hashlist.head_; bucket != nullptr; bucket = bucket->next_bucket_) {
467  ASSERT_ND(bucket->counts_ > 0);
468  ASSERT_ND(bucket->counts_ <= kBucketMaxCount);
469  ASSERT_ND(bucket->storage_id_ == hashlist.storage_id_);
470  log_count += bucket->counts_;
471 
472  // if there are multiple partitions, we first partition log entries.
473  if (multi_partitions) {
474  storage::Partitioner partitioner(engine_, bucket->storage_id_);
475  ASSERT_ND(partitioner.is_valid());
476  if (partitioner.is_partitionable()) {
477  // calculate partitions
478  for (uint32_t i = 0; i < bucket->counts_; ++i) {
479  position_array[i] = bucket->log_positions_[i];
480  ASSERT_ND(log_buffer.resolve(position_array[i])->header_.storage_id_
481  == bucket->storage_id_);
482  ASSERT_ND(log_buffer.resolve(position_array[i])->header_.storage_id_
483  == hashlist.storage_id_);
484  }
485  storage::Partitioner::PartitionBatchArguments args = {
486  static_cast< storage::PartitionId >(numa_node_),
487  log_buffer,
488  position_array,
489  bucket->counts_,
490  partition_array};
491  partitioner.partition_batch(args);
492 
493  // sort the log positions by the calculated partitions
494  std::memset(sort_array, 0, sizeof(PartitionSortEntry) * bucket->counts_);
495  for (uint32_t i = 0; i < bucket->counts_; ++i) {
496  sort_array[i].set(partition_array[i], bucket->log_positions_[i]);
497  }
498  std::sort(sort_array, sort_array + bucket->counts_);
499 
500  // let's reuse the current bucket as a temporary memory to hold sorted entries.
501  // buckets are discarded after the flushing, so this doesn't cause any issue.
502  const uint32_t original_count = bucket->counts_;
503  storage::PartitionId current_partition = sort_array[0].partition_;
504  bucket->log_positions_[0] = sort_array[0].position_;
505  bucket->counts_ = 1;
506  for (uint32_t i = 1; i < original_count; ++i) {
507  if (current_partition == sort_array[i].partition_) {
508  bucket->log_positions_[bucket->counts_] = sort_array[i].position_;
509  ++bucket->counts_;
510  ASSERT_ND(bucket->counts_ <= original_count);
511  } else {
512  // the current partition has ended.
513  // let's send out these log entries to this partition
514  send_bucket_partition(bucket, current_partition);
515  // this is the beginning of next partition
516  current_partition = sort_array[i].partition_;
517  bucket->log_positions_[0] = sort_array[i].position_;
518  bucket->counts_ = 1;
519  }
520  }
521 
522  ASSERT_ND(bucket->counts_ > 0);
523  // send out the last partition
524  send_bucket_partition(bucket, current_partition);
525  } else {
526  // in this case, it's same as single partition regarding this storage.
527  send_bucket_partition(bucket, 0);
528  }
529  } else {
530  // if it's not multi-partition, we blindly send everything to partition-0 (NUMA node 0)
531  send_bucket_partition(bucket, 0);
532  }
533  }
534 
535  stop_watch.stop();
536  LOG(INFO) << to_string() << " sent out " << log_count << " log entries for storage-"
537  << hashlist.storage_id_ << " in " << stop_watch.elapsed_ms() << " milliseconds";
538 }
539 
541  const log::LogHeader* header,
542  storage::StorageType storage_type,
543  uint32_t* shortest_key_length,
544  uint32_t* longest_key_length) {
545  if (storage_type == storage::kArrayStorage) {
546  *shortest_key_length = sizeof(storage::array::ArrayOffset);
547  *longest_key_length = sizeof(storage::array::ArrayOffset);
548  } else if (storage_type == storage::kMasstreeStorage) {
550  reinterpret_cast<const storage::masstree::MasstreeCommonLogType*>(header);
551  uint16_t key_length = the_log->key_length_;
552  ASSERT_ND(key_length > 0);
553  *shortest_key_length = std::min<uint32_t>(*shortest_key_length, key_length);
554  *longest_key_length = std::max<uint32_t>(*longest_key_length, key_length);
555  } else if (storage_type == storage::kHashStorage) {
556  const storage::hash::HashCommonLogType* the_log =
557  reinterpret_cast<const storage::hash::HashCommonLogType*>(header);
558  uint16_t key_length = the_log->key_length_;
559  ASSERT_ND(key_length > 0);
560  *shortest_key_length = std::min<uint32_t>(*shortest_key_length, key_length);
561  *longest_key_length = std::max<uint32_t>(*longest_key_length, key_length);
562  } else if (storage_type == storage::kSequentialStorage) {
563  // this has no meaning for sequential storage. just put some number.
564  *shortest_key_length = 8U;
565  *longest_key_length = 8U;
566  }
567 }
568 
569 
570 void LogMapper::send_bucket_partition(
571  Bucket* bucket, storage::PartitionId partition) {
572  VLOG(0) << to_string() << " sending " << bucket->counts_ << " log entries for storage-"
573  << bucket->storage_id_ << " to partition-" << static_cast<int>(partition);
574  storage::StorageType storage_type
575  = engine_->get_storage_manager()->get_storage(bucket->storage_id_)->meta_.type_;
576 
577  // let's do "pre-sort" to mitigate work from reducer to mapper
579  && storage_type != storage::kSequentialStorage) { // if sequential, presorting is useless
580  send_bucket_partition_presort(bucket, storage_type, partition);
581  } else {
582  send_bucket_partition_general(bucket, storage_type, partition, bucket->log_positions_);
583  }
584 }
585 
586 void LogMapper::send_bucket_partition_general(
587  const Bucket* bucket,
588  storage::StorageType storage_type,
589  storage::PartitionId partition,
590  const BufferPosition* positions) {
591  uint64_t written = 0;
592  uint32_t log_count = 0;
593  uint32_t shortest_key_length = 0xFFFF;
594  uint32_t longest_key_length = 0;
595  // stitch the log entries in send buffer
596  char* send_buffer = reinterpret_cast<char*>(tmp_send_buffer_slice_.get_block());
597  const char* io_base = reinterpret_cast<const char*>(io_buffer_.get_block());
598  ASSERT_ND(tmp_send_buffer_slice_.get_size() == kSendBufferSize);
599 
600  for (uint32_t i = 0; i < bucket->counts_; ++i) {
601  uint64_t pos = from_buffer_position(positions[i]);
602  const log::LogHeader* header = reinterpret_cast<const log::LogHeader*>(io_base + pos);
603  ASSERT_ND(header->storage_id_ == bucket->storage_id_);
604  uint16_t log_length = header->log_length_;
605  ASSERT_ND(log_length > 0);
606  ASSERT_ND(log_length % 8 == 0);
607  if (written + log_length > kSendBufferSize) {
608  // buffer full. send out.
609  send_bucket_partition_buffer(
610  bucket,
611  partition,
612  send_buffer,
613  log_count,
614  written,
615  shortest_key_length,
616  longest_key_length);
617  log_count = 0;
618  written = 0;
619  shortest_key_length = 0xFFFF;
620  longest_key_length = 0;
621  }
622  std::memcpy(send_buffer + written, header, header->log_length_);
623  written += header->log_length_;
624  ++log_count;
625  update_key_lengthes(header, storage_type, &shortest_key_length, &longest_key_length);
626  }
627  send_bucket_partition_buffer(
628  bucket,
629  partition,
630  send_buffer,
631  log_count,
632  written,
633  shortest_key_length,
634  longest_key_length);
635 }
636 
637 void LogMapper::send_bucket_partition_presort(
638  Bucket* bucket,
639  storage::StorageType storage_type,
640  storage::PartitionId partition) {
641  storage::Partitioner partitioner(engine_, bucket->storage_id_);
642 
643  char* io_base = reinterpret_cast<char*>(io_buffer_.get_block());
644  presort_ouputs_.assure_capacity(sizeof(BufferPosition) * bucket->counts_);
645  BufferPosition* outputs = reinterpret_cast<BufferPosition*>(presort_ouputs_.get_block());
646 
647  uint32_t shortest_key_length = 0xFFFF;
648  uint32_t longest_key_length = 0;
649  if (storage_type == storage::kMasstreeStorage) {
650  for (uint32_t i = 0; i < bucket->counts_; ++i) {
651  uint64_t pos = from_buffer_position(bucket->log_positions_[i]);
652  const log::LogHeader* header = reinterpret_cast<const log::LogHeader*>(io_base + pos);
653  update_key_lengthes(header, storage_type, &shortest_key_length, &longest_key_length);
654  }
655  }
656 
657  LogBuffer buffer(io_base);
658  uint32_t count = 0;
659  storage::Partitioner::SortBatchArguments args = {
660  buffer,
661  bucket->log_positions_,
662  bucket->counts_,
663  shortest_key_length,
664  longest_key_length,
665  &presort_buffer_,
667  outputs,
668  &count};
669  partitioner.sort_batch(args);
670  ASSERT_ND(count <= bucket->counts_);
671  bucket->counts_ = count; // it might be compacted
672 
673  // then same as usual send_bucket_partition() except we use outputs
674  send_bucket_partition_general(bucket, storage_type, partition, outputs);
675 }
676 
677 void LogMapper::send_bucket_partition_buffer(
678  const Bucket* bucket,
679  storage::PartitionId partition,
680  const char* send_buffer,
681  uint32_t log_count,
682  uint64_t written,
683  uint32_t shortest_key_length,
684  uint32_t longest_key_length) {
685  if (written == 0) {
686  return;
687  }
688 
689  LogReducerRef reducer(engine_, partition);
690  reducer.append_log_chunk(
691  bucket->storage_id_,
692  send_buffer,
693  log_count,
694  written,
695  shortest_key_length,
696  longest_key_length);
697 }
698 
699 
700 std::ostream& operator<<(std::ostream& o, const LogMapper& v) {
701  o << "<LogMapper>"
702  << "<id_>" << v.id_ << "</id_>"
703  << "<numa_node_>" << static_cast<int>(v.numa_node_) << "</numa_node_>"
704  << "<buckets_allocated_count_>" << v.buckets_allocated_count_ << "</buckets_allocated_count_>"
705  << "<hashlist_allocated_count>" << v.hashlist_allocated_count_ << "</hashlist_allocated_count>"
706  << "<processed_log_count_>" << v.processed_log_count_ << "</processed_log_count_>"
707  << "</LogMapper>";
708  return o;
709 }
710 
711 
712 } // namespace snapshot
713 } // namespace foedus
LoggerRef get_logger(LoggerId logger_id)
Returns a reference to the logger of the given ID.
Definition: log_manager.cpp:49
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
numa_alloc_onnode() and numa_free().
BufferPosition to_buffer_position(uint64_t byte_position)
Definition: snapshot_id.hpp:74
std::string construct_suffixed_log_path(int node, int logger, LogFileOrdinal ordinal) const
construct full path of individual log file (log_folder/LOGGERID_ORDINAL.log)
Definition: log_options.cpp:41
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
LogMapper(Engine *engine, uint16_t local_ordinal)
void release_block()
Releases the memory block.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
const uint64_t kIoAlignment
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
a contiguous range of log entries that might span multiple files.
Definition: log_id.hpp:52
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower data device.
uint64_t ArrayOffset
The only key type in array storage.
Definition: array_id.hpp:48
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
uint64_t end_offset
Definition: log_id.hpp:56
Declares all log types used in this storage type.
record targetted logs
Definition: log_type.hpp:103
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Represents a time epoch.
Definition: epoch.hpp:61
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
uint32_t log_mapper_bucket_kb_
The size in KB of bucket (buffer for each partition) in mapper.
FileStatus status(const Path &p)
Returns the status of the file.
Definition: filesystem.cpp:45
Definitions of IDs in this package and a few related constant values.
ErrorStack uninitialize_once() override
void update_key_lengthes(const log::LogHeader *header, storage::StorageType storage_type, uint32_t *shortest_key_length, uint32_t *longest_key_length)
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
A log mapper, which reads log files from one logger and sends them to corresponding log reducers...
const EngineOptions & get_options() const
Definition: engine.cpp:39
bool log_mapper_sort_before_send_
Whether to sort logs in mapper side before sending it to reducer.
uint16_t log_mapper_io_buffer_mb_
The size in MB of IO buffer to read log files in mapper.
const uint64_t kHugepageSize
So far 2MB is the only page size available via Transparent Huge Page (THP).
Definition: memory_id.hpp:50
uint64_t from_buffer_position(BufferPosition buffer_position)
Definition: snapshot_id.hpp:78
Batches zero or more ErrorStack objects to represent in one ErrorStack.
uint64_t align_io_floor(uint64_t offset)
A view of Logger object for other SOCs and master engine.
Definition: logger_ref.hpp:35
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
Analogue of boost::filesystem::path.
Definition: path.hpp:37
LogRange get_log_range(Epoch prev_epoch, Epoch until_epoch)
Constructs the range of log entries that represent the given epoch ranges.
Definition: logger_ref.cpp:91
LogFileOrdinal begin_file_ordinal
Definition: log_id.hpp:53
uint64_t begin_offset
Definition: log_id.hpp:55
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
Declares all log types used in this storage type.
snapshot::SnapshotOptions snapshot_
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
bool is_empty() const
Definition: log_id.hpp:57
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
uint64_t align_io_ceil(uint64_t offset)
A common header part for all log types.
ErrorStack initialize_once() override
A slice of foedus::memory::AlignedMemory.
void * get_block() const
Returns the memory block.
Set of options for snapshot manager.
uint16_t group_count_
Number of ThreadGroup in the engine.
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
const uint16_t id_
Unique ID of this mapper or reducer.
uint64_t get_size() const
Returns the byte size of the memory block.
StorageType
Type of the storage, such as hash.
Definition: storage_id.hpp:122
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
0x0601 : "SNAPSHT: Inconsistent end of log entry detected." .
Definition: error_code.hpp:161
StorageType type_
type of the storage.
Definition: metadata.hpp:105
thread::ThreadOptions thread_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Represents an I/O stream on one file without filesystem caching.
const ErrorStack kRetOk
Normal return value for no-error case.
double elapsed_sec() const
Definition: stop_watch.hpp:51
Convenient way of writing hex integers to stream.
0x3001 : foedus::log::FillerLogType .
Definition: log_type.hpp:111
Base class for LogMapper and LogReducer to share common code.
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
LogFileOrdinal end_file_ordinal
Definition: log_id.hpp:54
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
StorageControlBlock * get_storage(StorageId id)
Returns the storage of given ID.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorStack handle_process() override
Implements the specific logics in derived class.
uint16_t calculate_logger_id(Engine *engine, uint16_t local_ordinal)
Unique ID of this log mapper.
The offset is set to offset bytes.
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
Definition: log_buffer.cpp:32
uint16_t loggers_per_node_
Number of loggers per NUMA node.
Definition: log_options.hpp:80
0x3002 : foedus::log::EpochMarkerLogType .
Definition: log_type.hpp:112
bool is_null() const
Returns if this object doesn't hold a valid memory block.