20 #include <glog/logging.h>
60 processed_log_count_(0) {
61 clear_storage_buckets();
68 io_buffer_size = assorted::align<uint64_t, memory::kHugepageSize>(io_buffer_size);
77 buckets_memory_.
alloc(
93 presort_buffer_.
alloc(
98 presort_ouputs_.
alloc(
104 uint64_t tmp_offset = 0;
106 tmp_offset += kSendBufferSize;
108 tmp_offset += kBucketSize;
110 tmp_offset += kBucketSize << 1;
111 const uint64_t hashlist_bytesize = kBucketHashListMaxCount *
sizeof(BucketHashList);
113 &tmp_memory_, tmp_offset, hashlist_bytesize);
114 tmp_offset += hashlist_bytesize;
116 tmp_offset += kBucketSize;
117 if (tmp_memory_size < tmp_offset) {
118 LOG(FATAL) <<
"tmp_memory_size is too small. some contant values are messed up";
121 processed_log_count_ = 0;
122 clear_storage_buckets();
132 clear_storage_buckets();
147 LOG(INFO) <<
to_string() <<
" has no logs to process";
148 report_completion(0);
159 processed_log_count_ = 0;
161 status.size_inbuf_aligned_ = io_buffer_.
get_size();
163 status.ended_ =
false;
164 status.first_read_ =
true;
166 while (!status.ended_) {
170 status.cur_file_ordinal_));
172 if (file_size % kIoAlignment != 0) {
173 LOG(WARNING) <<
to_string() <<
" Interesting, non-aligned file size, which probably means"
174 <<
" previous writes didn't flush. file path=" << path <<
", file size=" <<
file_size;
177 ASSERT_ND(file_size % kIoAlignment == 0);
185 status.next_infile_ = 0;
199 DVLOG(1) <<
to_string() <<
"opened log file " << file;
205 DVLOG(1) <<
to_string() <<
" seeked to: " << assorted::Hex(status.buf_infile_aligned_);
206 status.end_inbuf_aligned_ = std::min(
208 align_io_ceil(status.end_infile_ - status.buf_infile_aligned_));
209 ASSERT_ND(status.end_inbuf_aligned_ % kIoAlignment == 0);
212 status.cur_inbuf_ = 0;
213 if (status.next_infile_ != status.buf_infile_aligned_) {
214 ASSERT_ND(status.next_infile_ > status.buf_infile_aligned_);
215 status.cur_inbuf_ = status.next_infile_ - status.buf_infile_aligned_;
216 status.cur_inbuf_ = status.next_infile_ - status.buf_infile_aligned_;
217 DVLOG(1) <<
to_string() <<
" skipped " << status.cur_inbuf_ <<
" bytes for aligned read";
221 if (status.more_in_the_file_) {
222 ASSERT_ND(status.next_infile_ > status.buf_infile_aligned_);
225 status.ended_ =
true;
228 ++status.cur_file_ordinal_;
229 status.next_infile_ = 0;
231 <<
" moved on to next log file ordinal " << status.cur_file_ordinal_;
238 LOG(INFO) <<
to_string() <<
" processed " << processed_log_count_ <<
" log entries in "
243 void LogMapper::report_completion(
double elapsed_sec) {
246 LOG(INFO) <<
"All mappers done. " <<
to_string() <<
" was the last mapper. took "
247 << elapsed_sec <<
"s";
251 ErrorStack LogMapper::handle_process_buffer(
const fs::DirectIoFile &file, IoBufStatus*
status) {
257 clear_storage_buckets();
259 char* buffer =
reinterpret_cast<char*
>(io_buffer_.
get_block());
260 status->more_in_the_file_ =
false;
261 for (; status->cur_inbuf_ < status->end_inbuf_aligned_; ++processed_log_count_) {
264 const log::LogHeader* header
265 =
reinterpret_cast<const log::LogHeader*
>(buffer + status->cur_inbuf_);
267 ASSERT_ND(status->buf_infile_aligned_ != 0 || status->cur_inbuf_ != 0
275 if (
UNLIKELY(header->log_length_ + status->cur_inbuf_ > status->end_inbuf_aligned_)) {
279 if (status->to_infile(status->cur_inbuf_ + header->log_length_)
280 > status->size_infile_aligned_) {
282 LOG(ERROR) <<
"inconsistent end of log entry. offset="
283 << status->to_infile(status->cur_inbuf_)
284 <<
", file=" << file <<
", log header=" << *header;
287 status->next_infile_ = status->to_infile(status->cur_inbuf_);
288 status->more_in_the_file_ =
true;
292 const log::EpochMarkerLogType *marker =
293 reinterpret_cast<const log::EpochMarkerLogType*
>(header);
294 ASSERT_ND(header->log_length_ ==
sizeof(log::EpochMarkerLogType));
295 ASSERT_ND(marker->log_file_ordinal_ == status->cur_file_ordinal_);
296 ASSERT_ND(marker->log_file_offset_ == status->to_infile(status->cur_inbuf_));
297 ASSERT_ND(marker->new_epoch_ >= marker->old_epoch_);
298 ASSERT_ND(!base_epoch.is_valid() || marker->new_epoch_ >= base_epoch);
299 ASSERT_ND(marker->new_epoch_ <= until_epoch);
300 if (status->first_read_) {
302 || marker->old_epoch_ <= base_epoch
303 || marker->old_epoch_ == marker->new_epoch_);
304 status->first_read_ =
false;
306 ASSERT_ND(!base_epoch.is_valid() || marker->old_epoch_ >= base_epoch);
311 bool bucketed = bucket_log(header->storage_id_, status->cur_inbuf_);
314 bool added = add_new_bucket(header->storage_id_);
316 bucketed = bucket_log(header->storage_id_, status->cur_inbuf_);
321 added = add_new_bucket(header->storage_id_);
323 bucketed = bucket_log(header->storage_id_, status->cur_inbuf_);
329 status->cur_inbuf_ += header->log_length_;
334 if (status->cur_inbuf_ == status->end_inbuf_aligned_
335 && status->end_infile_ > status->to_infile(status->cur_inbuf_)) {
336 LOG(INFO) <<
"Hooray, a full mapper buffer exactly ends with a complete log record. rare!";
337 status->next_infile_ = status->to_infile(status->cur_inbuf_);
338 status->more_in_the_file_ =
true;
347 BucketHashList* hashlist = find_storage_hashlist(storage_id);
348 if (
UNLIKELY(hashlist ==
nullptr)) {
352 if (
LIKELY(!hashlist->tail_->is_full())) {
356 hashlist->tail_->log_positions_[hashlist->tail_->counts_] = log_position;
357 ++hashlist->tail_->counts_;
366 if (buckets_allocated_count_ >= buckets_memory_.
get_size() /
sizeof(Bucket)) {
369 LOG(WARNING) <<
to_string() <<
" ran out of buckets_memory_, so it has to flush buckets before"
370 " processing one IO buffer. This shouldn't happen often. check your log_mapper_bucket_kb_"
371 " setting. this=" << *
this;
375 Bucket* base_address =
reinterpret_cast<Bucket*
>(buckets_memory_.
get_block());
376 Bucket* new_bucket = base_address + buckets_allocated_count_;
377 ++buckets_allocated_count_;
378 new_bucket->storage_id_ = storage_id;
379 new_bucket->counts_ = 0;
380 new_bucket->next_bucket_ =
nullptr;
382 BucketHashList* hashlist = find_storage_hashlist(storage_id);
385 ASSERT_ND(hashlist->storage_id_ == storage_id);
387 hashlist->tail_->next_bucket_ = new_bucket;
388 hashlist->tail_ = new_bucket;
389 ++hashlist->bucket_counts_;
393 if (hashlist_allocated_count_ >= kBucketHashListMaxCount) {
394 LOG(WARNING) <<
to_string() <<
" ran out of hashlist memory, so it has to flush buckets now"
395 " This shouldn't happen often. We must consider increasing kBucketHashListMaxCount."
401 BucketHashList* new_hashlist =
402 reinterpret_cast<BucketHashList*
>(tmp_hashlist_buffer_slice_.
get_block())
403 + hashlist_allocated_count_;
404 ++hashlist_allocated_count_;
406 new_hashlist->storage_id_ = storage_id;
407 new_hashlist->head_ = new_bucket;
408 new_hashlist->tail_ = new_bucket;
409 new_hashlist->bucket_counts_ = 1;
411 add_storage_hashlist(new_hashlist);
415 void LogMapper::add_storage_hashlist(BucketHashList* new_hashlist) {
417 uint8_t index =
static_cast<uint8_t
>(new_hashlist->storage_id_);
418 if (storage_hashlists_[index] ==
nullptr) {
419 storage_hashlists_[index] = new_hashlist;
420 new_hashlist->hashlist_next_ =
nullptr;
422 new_hashlist->hashlist_next_ = storage_hashlists_[index];
423 storage_hashlists_[index] = new_hashlist;
427 void LogMapper::clear_storage_buckets() {
428 std::memset(storage_hashlists_, 0,
sizeof(storage_hashlists_));
429 buckets_allocated_count_ = 0;
430 hashlist_allocated_count_ = 0;
433 void LogMapper::flush_all_buckets() {
434 for (uint16_t i = 0; i < 256; ++i) {
435 for (BucketHashList* hashlist = storage_hashlists_[i];
436 hashlist !=
nullptr && hashlist->storage_id_ != 0;
437 hashlist = hashlist->hashlist_next_) {
438 flush_bucket(*hashlist);
441 clear_storage_buckets();
444 void LogMapper::flush_bucket(
const BucketHashList& hashlist) {
450 PartitionSortEntry* sort_array =
reinterpret_cast<PartitionSortEntry*
>(
454 LogBuffer log_buffer(reinterpret_cast<char*>(io_buffer_.
get_block()));
460 LOG(INFO) <<
"These logs are sent to a dropped storage.. ignore them";
464 uint64_t log_count = 0;
465 debugging::StopWatch stop_watch;
466 for (Bucket* bucket = hashlist.head_; bucket !=
nullptr; bucket = bucket->next_bucket_) {
468 ASSERT_ND(bucket->counts_ <= kBucketMaxCount);
469 ASSERT_ND(bucket->storage_id_ == hashlist.storage_id_);
470 log_count += bucket->counts_;
473 if (multi_partitions) {
474 storage::Partitioner partitioner(
engine_, bucket->storage_id_);
476 if (partitioner.is_partitionable()) {
478 for (uint32_t i = 0; i < bucket->counts_; ++i) {
479 position_array[i] = bucket->log_positions_[i];
480 ASSERT_ND(log_buffer.resolve(position_array[i])->header_.storage_id_
481 == bucket->storage_id_);
482 ASSERT_ND(log_buffer.resolve(position_array[i])->header_.storage_id_
483 == hashlist.storage_id_);
485 storage::Partitioner::PartitionBatchArguments args = {
491 partitioner.partition_batch(args);
494 std::memset(sort_array, 0,
sizeof(PartitionSortEntry) * bucket->counts_);
495 for (uint32_t i = 0; i < bucket->counts_; ++i) {
496 sort_array[i].set(partition_array[i], bucket->log_positions_[i]);
498 std::sort(sort_array, sort_array + bucket->counts_);
502 const uint32_t original_count = bucket->counts_;
504 bucket->log_positions_[0] = sort_array[0].position_;
506 for (uint32_t i = 1; i < original_count; ++i) {
507 if (current_partition == sort_array[i].partition_) {
508 bucket->log_positions_[bucket->counts_] = sort_array[i].position_;
510 ASSERT_ND(bucket->counts_ <= original_count);
514 send_bucket_partition(bucket, current_partition);
516 current_partition = sort_array[i].partition_;
517 bucket->log_positions_[0] = sort_array[i].position_;
524 send_bucket_partition(bucket, current_partition);
527 send_bucket_partition(bucket, 0);
531 send_bucket_partition(bucket, 0);
536 LOG(INFO) <<
to_string() <<
" sent out " << log_count <<
" log entries for storage-"
537 << hashlist.storage_id_ <<
" in " << stop_watch.elapsed_ms() <<
" milliseconds";
543 uint32_t* shortest_key_length,
544 uint32_t* longest_key_length) {
553 *shortest_key_length = std::min<uint32_t>(*shortest_key_length, key_length);
554 *longest_key_length = std::max<uint32_t>(*longest_key_length, key_length);
560 *shortest_key_length = std::min<uint32_t>(*shortest_key_length, key_length);
561 *longest_key_length = std::max<uint32_t>(*longest_key_length, key_length);
564 *shortest_key_length = 8U;
565 *longest_key_length = 8U;
570 void LogMapper::send_bucket_partition(
572 VLOG(0) <<
to_string() <<
" sending " << bucket->counts_ <<
" log entries for storage-"
573 << bucket->storage_id_ <<
" to partition-" <<
static_cast<int>(partition);
580 send_bucket_partition_presort(bucket, storage_type, partition);
582 send_bucket_partition_general(bucket, storage_type, partition, bucket->log_positions_);
586 void LogMapper::send_bucket_partition_general(
587 const Bucket* bucket,
591 uint64_t written = 0;
592 uint32_t log_count = 0;
593 uint32_t shortest_key_length = 0xFFFF;
594 uint32_t longest_key_length = 0;
596 char* send_buffer =
reinterpret_cast<char*
>(tmp_send_buffer_slice_.
get_block());
597 const char* io_base =
reinterpret_cast<const char*
>(io_buffer_.
get_block());
600 for (uint32_t i = 0; i < bucket->counts_; ++i) {
602 const log::LogHeader* header =
reinterpret_cast<const log::LogHeader*
>(io_base + pos);
603 ASSERT_ND(header->storage_id_ == bucket->storage_id_);
604 uint16_t log_length = header->log_length_;
607 if (written + log_length > kSendBufferSize) {
609 send_bucket_partition_buffer(
619 shortest_key_length = 0xFFFF;
620 longest_key_length = 0;
622 std::memcpy(send_buffer + written, header, header->log_length_);
623 written += header->log_length_;
627 send_bucket_partition_buffer(
637 void LogMapper::send_bucket_partition_presort(
641 storage::Partitioner partitioner(
engine_, bucket->storage_id_);
643 char* io_base =
reinterpret_cast<char*
>(io_buffer_.
get_block());
647 uint32_t shortest_key_length = 0xFFFF;
648 uint32_t longest_key_length = 0;
650 for (uint32_t i = 0; i < bucket->counts_; ++i) {
652 const log::LogHeader* header =
reinterpret_cast<const log::LogHeader*
>(io_base + pos);
657 LogBuffer buffer(io_base);
659 storage::Partitioner::SortBatchArguments args = {
661 bucket->log_positions_,
669 partitioner.sort_batch(args);
671 bucket->counts_ = count;
674 send_bucket_partition_general(bucket, storage_type, partition, outputs);
677 void LogMapper::send_bucket_partition_buffer(
678 const Bucket* bucket,
680 const char* send_buffer,
683 uint32_t shortest_key_length,
684 uint32_t longest_key_length) {
689 LogReducerRef reducer(
engine_, partition);
690 reducer.append_log_chunk(
702 <<
"<id_>" << v.
id_ <<
"</id_>"
703 <<
"<numa_node_>" <<
static_cast<int>(v.
numa_node_) <<
"</numa_node_>"
704 <<
"<buckets_allocated_count_>" << v.buckets_allocated_count_ <<
"</buckets_allocated_count_>"
705 <<
"<hashlist_allocated_count>" << v.hashlist_allocated_count_ <<
"</hashlist_allocated_count>"
706 <<
"<processed_log_count_>" << v.processed_log_count_ <<
"</processed_log_count_>"
LoggerRef get_logger(LoggerId logger_id)
Returns a reference to the logger of the given ID.
Metadata meta_
common part of the metadata.
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
numa_alloc_onnode() and numa_free().
BufferPosition to_buffer_position(uint64_t byte_position)
std::string construct_suffixed_log_path(int node, int logger, LogFileOrdinal ordinal) const
construct full path of individual log file (log_folder/LOGGERID_ORDINAL.log)
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
storage::StorageManager * get_storage_manager() const
See Storage Manager.
LogMapper(Engine *engine, uint16_t local_ordinal)
void release_block()
Releases the memory block.
uint32_t StorageId
Unique ID for storage.
const uint64_t kIoAlignment
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
a contiguous range of log entries that might span multiple files.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower data device.
uint64_t ArrayOffset
The only key type in array storage.
uint32_t BufferPosition
Represents a position in some buffer.
Declares all log types used in this storage type.
Brings error stacktrace information as return value of functions.
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
uint32_t log_mapper_bucket_kb_
The size in KB of bucket (buffer for each partition) in mapper.
FileStatus status(const Path &p)
Returns the status of the file.
Definitions of IDs in this package and a few related constant values.
ErrorStack uninitialize_once() override
void update_key_lengthes(const log::LogHeader *header, storage::StorageType storage_type, uint32_t *shortest_key_length, uint32_t *longest_key_length)
A base class for MasstreeInsertLogType/MasstreeDeleteLogType/MasstreeOverwriteLogType.
#define LIKELY(x)
Hints that x is highly likely true.
A log mapper, which reads log files from one logger and sends them to corresponding log reducers...
uint16_t increment_completed_mapper_count()
const EngineOptions & get_options() const
bool log_mapper_sort_before_send_
Whether to sort logs in mapper side before sending it to reducer.
uint16_t log_mapper_io_buffer_mb_
The size in MB of IO buffer to read log files in mapper.
const uint64_t kHugepageSize
So far 2MB is the only page size available via Transparent Huge Page (THP).
uint64_t from_buffer_position(BufferPosition buffer_position)
Batches zero or more ErrorStack objects to represent in one ErrorStack.
uint64_t align_io_floor(uint64_t offset)
A view of Logger object for other SOCs and master engine.
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
Analogue of boost::filesystem::path.
LogRange get_log_range(Epoch prev_epoch, Epoch until_epoch)
Constructs the range of log entries that represent the given epoch ranges.
LogFileOrdinal begin_file_ordinal
log::LogManager * get_log_manager() const
See Log Manager.
Declares all log types used in this storage type.
snapshot::SnapshotOptions snapshot_
Database engine object that holds all resources and provides APIs.
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
uint64_t stop()
Take another current time tick.
uint64_t align_io_ceil(uint64_t offset)
ErrorStack initialize_once() override
uint64_t get_size() const
A slice of foedus::memory::AlignedMemory.
Epoch get_valid_until_epoch() const
void * get_block() const
Returns the memory block.
Set of options for snapshot manager.
uint16_t group_count_
Number of ThreadGroup in the engine.
uint64_t file_size(const Path &p)
Returns size of the file.
Epoch get_base_epoch() const
const uint16_t id_
Unique ID of this mapper or reducer.
uint64_t get_size() const
Returns the byte size of the memory block.
StorageType
Type of the storage, such as hash.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
0x0601 : "SNAPSHT: Inconsistent end of log entry detected." .
thread::ThreadOptions thread_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Represents an I/O stream on one file without filesystem caching.
const ErrorStack kRetOk
Normal return value for no-error case.
double elapsed_sec() const
Convenient way of writing hex integers to stream.
0x3001 : foedus::log::FillerLogType .
const uint16_t numa_node_
Base class for LogMapper and LogReducer to share common code.
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
uint16_t get_mappers_count() const
LogFileOrdinal end_file_ordinal
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
A high-resolution stop watch.
StorageControlBlock * get_storage(StorageId id)
Returns the storage of given ID.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorStack handle_process() override
Implements the specific logics in derived class.
uint16_t calculate_logger_id(Engine *engine, uint16_t local_ordinal)
Unique ID of this log mapper.
The offset is set to offset bytes.
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
uint16_t loggers_per_node_
Number of loggers per NUMA node.
0x3002 : foedus::log::EpochMarkerLogType .
bool is_null() const
Returns if this object doesn't hold a valid memory block.