libfoedus-core
FOEDUS Core Library
log_buffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 #include <ostream>
25 #include <string>
26 
28 #include "foedus/fs/filesystem.hpp"
29 
30 namespace foedus {
31 namespace snapshot {
32 std::ostream& operator<<(std::ostream& o, const SortedBuffer& v) {
33  v.describe(&o);
34  return o;
35 }
36 
37 void SortedBuffer::describe_base_elements(std::ostream* optr) const {
38  std::ostream& o = *optr;
39  o << "<buffer_>" << reinterpret_cast<const void*>(buffer_) << "</buffer_>"
40  << "<buffer_size_>" << buffer_size_ << "</buffer_size_>"
41  << "<offset_>" << offset_ << "</offset_>"
42  << "<total_size_>" << total_size_ << "</total_size_>"
43  << "<cur_block_storage_id_>" << cur_block_storage_id_ << "</cur_block_storage_id_>"
44  << "<cur_block_log_count_>" << cur_block_log_count_ << "</cur_block_log_count_>"
45  << "<cur_block_abosulte_begin_>" << cur_block_abosulte_begin_ << "</cur_block_abosulte_begin_>"
46  << "<cur_block_abosulte_end_>" << cur_block_abosulte_end_ << "</cur_block_abosulte_end_>";
47 }
48 
49 void InMemorySortedBuffer::describe(std::ostream* optr) const {
50  std::ostream& o = *optr;
51  o << "<InMemorySortedBuffer>";
53  o << "</InMemorySortedBuffer>";
54 }
55 
56 
59  : SortedBuffer(
60  reinterpret_cast<char*>(io_buffer.get_block()),
61  io_buffer.get_size(),
62  fs::file_size(file->get_path())),
63  file_(file),
64  io_buffer_(io_buffer) {
65  ASSERT_ND(buffer_size_ % kAlignment == 0);
66  ASSERT_ND(total_size_ % kAlignment == 0);
67 }
68 
69 std::string DumpFileSortedBuffer::to_string() const {
70  return std::string("DumpFileSortedBuffer: ") + file_->get_path().string();
71 }
72 
73 void DumpFileSortedBuffer::describe(std::ostream* optr) const {
74  std::ostream& o = *optr;
75  o << "<DumpFileSortedBuffer>";
77  o << "<file_>" << file_ << "</file_>";
78  o << "<io_buffer_>" << io_buffer_ << "</io_buffer_>";
79  o << "</DumpFileSortedBuffer>";
80 }
81 
82 ErrorCode DumpFileSortedBuffer::wind(uint64_t next_absolute_pos) {
83  ASSERT_ND(offset_ % kAlignment == 0);
84  assert_checks();
85  if (next_absolute_pos == offset_ || offset_ + buffer_size_ >= total_size_) {
86  return kErrorCodeOk; // nothing to do then
87  } else if (next_absolute_pos < offset_ || // backward wind
88  next_absolute_pos >= offset_ + buffer_size_ || // jumps more than one buffer
89  next_absolute_pos >= total_size_) { // jumps exceeding the end of file
90  // this class doesn't support these operations. We shouldn't need them in any case.
91  LOG(FATAL) << " wtf next_absolute_pos=" << next_absolute_pos << ", offset=" << offset_
92  << ", buffer_size=" << buffer_size_ << ", total_size_=" << total_size_;
94  }
95 
96  // suppose buf=64M and we have read second window(64M-128M) and now moving on to third window.
97  // in the easiest case, current offset_=64M, next_absolute_pos=128M. we just read 64M.
98  // but, probably next_absolute_pos=128M-alpha, further alpha might not be 4k-aligned.
99  // the following code takes care of those cases.
101  uint64_t retained_bytes
102  = assorted::align<uint64_t, kAlignment>(offset_ + buffer_size_ - next_absolute_pos);
103  ASSERT_ND(retained_bytes <= buffer_size_);
104  std::memmove(buffer_, buffer_ + buffer_size_ - retained_bytes, retained_bytes);
105 
106  // we read as much as possible. note that "next_absolute_pos + buffer_size_" might become larger
107  // than cur_block_abosulte_end_. It's fine, the next DumpFileSortedBuffer object for another
108  // storage will take care (and make use) of the excessive bytes.
109  uint64_t desired_reads = std::min(
110  buffer_size_ - retained_bytes,
112  memory::AlignedMemorySlice sub_slice(io_buffer_, retained_bytes, desired_reads);
113  CHECK_ERROR_CODE(file_->read(desired_reads, sub_slice));
114  offset_ = offset_ + buffer_size_ - retained_bytes;
115 
116  ASSERT_ND(offset_ % kAlignment == 0);
117  ASSERT_ND(next_absolute_pos >= offset_);
118  assert_checks();
119  return kErrorCodeOk;
120 }
121 
122 } // namespace snapshot
123 } // namespace foedus
ErrorCode wind(uint64_t next_absolute_pos) override
Loads next data block to be consumed by the caller (composer).
Definition: log_buffer.cpp:82
DumpFileSortedBuffer(fs::DirectIoFile *file, memory::AlignedMemorySlice io_buffer)
wraps the given file as a buffer.
Definition: log_buffer.cpp:57
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
void describe(std::ostream *o) const override
Writes out a detailed description of this object to stream.
Definition: log_buffer.cpp:73
const uint64_t total_size_
see get_total_size()
Definition: log_buffer.hpp:199
uint64_t offset_
see get_offset()
Definition: log_buffer.hpp:197
std::string to_string() const override
Returns a short string that briefly describes this object.
Definition: log_buffer.cpp:69
Represents one input stream of sorted log entries.
Definition: log_buffer.hpp:81
uint64_t get_current_offset() const
uint64_t cur_block_abosulte_begin_
see get_cur_block_abosulte_begin()
Definition: log_buffer.hpp:205
void describe_base_elements(std::ostream *o) const
Definition: log_buffer.cpp:37
0 means no-error.
Definition: error_code.hpp:87
storage::StorageId cur_block_storage_id_
If this is zero, this buffer is not set for reading any block.
Definition: log_buffer.hpp:202
virtual void describe(std::ostream *o) const =0
Writes out a detailed description of this object to stream.
void describe(std::ostream *o) const override
Writes out a detailed description of this object to stream.
Definition: log_buffer.cpp:49
const std::string & string() const
Definition: path.hpp:65
ErrorCode read(uint64_t desired_bytes, foedus::memory::AlignedMemory *buffer)
Sequentially read the given amount of contents from the current position.
A slice of foedus::memory::AlignedMemory.
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
Represents an I/O stream on one file without filesystem caching.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint64_t cur_block_abosulte_end_
see get_cur_block_abosulte_end()
Definition: log_buffer.hpp:207
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
Definition: log_buffer.cpp:32