libfoedus-core
FOEDUS Core Library
log_mapper_impl.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_SNAPSHOT_LOG_MAPPER_IMPL_HPP_
19 #define FOEDUS_SNAPSHOT_LOG_MAPPER_IMPL_HPP_
20 #include <stdint.h>
21 
22 #include <iosfwd>
23 #include <string>
24 
25 #include "foedus/compiler.hpp"
26 #include "foedus/fwd.hpp"
27 #include "foedus/initializable.hpp"
28 #include "foedus/fs/fwd.hpp"
29 #include "foedus/log/fwd.hpp"
30 #include "foedus/log/log_id.hpp"
32 #include "foedus/snapshot/fwd.hpp"
35 #include "foedus/thread/fwd.hpp"
38 
39 namespace foedus {
40 namespace snapshot {
73 class LogMapper final : public MapReduceBase {
74  public:
75  LogMapper(Engine* engine, uint16_t local_ordinal);
76 
77  ErrorStack initialize_once() override;
78  ErrorStack uninitialize_once() override;
79 
80  std::string to_string() const override {
81  return std::string("LogMapper-") + std::to_string(id_);
82  }
83  friend std::ostream& operator<<(std::ostream& o, const LogMapper& v);
84 
85  protected:
86  ErrorStack handle_process() override;
87 
88  private:
89  enum Constsants {
90  kBucketSize = 1 << 16,
91  kBucketMaxCount = (kBucketSize - 16) / 4,
97  kBucketHashListMaxCount = 1 << 12,
104  kSendBufferSize = 1 << 20,
105  };
106 
110  struct Bucket final {
111  inline bool is_full() const ALWAYS_INLINE { return counts_ >= kBucketMaxCount; }
112 
114  storage::StorageId storage_id_; // +4 => 4
116  uint32_t counts_; // +4 => 8
118  Bucket* next_bucket_; // +8 => 16
120  BufferPosition log_positions_[kBucketMaxCount]; // + 4 * kBucketMaxCount => kBucketSize
121  };
122  STATIC_SIZE_CHECK(sizeof(Bucket), kBucketSize)
123 
124 
132  struct BucketHashList {
133  storage::StorageId storage_id_; // +4 => 4
134  uint16_t bucket_counts_; // +2 => 6
135  uint16_t dummy_; // +2 => 8
136  Bucket* head_; // +8 => 16
137  Bucket* tail_; // +8 => 24
138  BucketHashList* hashlist_next_; // +8 => 32
139  };
140 
148  struct PartitionSortEntry {
149  inline void set(storage::PartitionId partition, BufferPosition position) ALWAYS_INLINE {
150  partition_ = partition;
151  position_ = position;
152  }
153 
154  inline bool operator<(const PartitionSortEntry& other) const ALWAYS_INLINE {
155  return partition_ < other.partition_;
156  }
157  inline bool operator<=(const PartitionSortEntry& other) const ALWAYS_INLINE {
158  return partition_ <= other.partition_;
159  }
160  inline bool operator==(const PartitionSortEntry& other) const ALWAYS_INLINE {
161  return partition_ == other.partition_;
162  }
163  inline bool operator!=(const PartitionSortEntry& other) const ALWAYS_INLINE {
164  return partition_ != other.partition_;
165  }
166  inline bool operator>(const PartitionSortEntry& other) const ALWAYS_INLINE {
167  return partition_ > other.partition_;
168  }
169  inline bool operator>=(const PartitionSortEntry& other) const ALWAYS_INLINE {
170  return partition_ >= other.partition_;
171  }
172 
173  uint16_t filler1_; // +2 => 2
174  uint8_t filler2_; // +1 => 3
175  storage::PartitionId partition_; // +1 => 4
176  BufferPosition position_; // +4 => 8
177  };
178 
179  struct IoBufStatus {
180  uint64_t size_inbuf_aligned_;
181  uint64_t size_infile_aligned_;
182 
183  uint64_t next_infile_;
184  uint64_t buf_infile_aligned_;
185  uint64_t cur_inbuf_;
186  uint64_t end_inbuf_aligned_;
187  uint64_t end_infile_;
188  bool more_in_the_file_;
189  bool first_read_;
190  bool ended_;
191  log::LogFileOrdinal cur_file_ordinal_;
192 
193  uint64_t to_infile(uint64_t inbuf) const { return inbuf + buf_infile_aligned_; }
194  };
195 
197  memory::AlignedMemory io_buffer_;
198 
200  memory::AlignedMemory buckets_memory_;
201 
211  memory::AlignedMemory tmp_memory_;
212 
214  memory::AlignedMemory presort_buffer_;
216  memory::AlignedMemory presort_ouputs_;
218  memory::AlignedMemory presort_reordered_;
219 
224  memory::AlignedMemorySlice tmp_send_buffer_slice_;
225 
230  memory::AlignedMemorySlice tmp_position_array_slice_;
231 
236  memory::AlignedMemorySlice tmp_sort_array_slice_;
237 
242  memory::AlignedMemorySlice tmp_hashlist_buffer_slice_;
243 
249  memory::AlignedMemorySlice tmp_partition_array_slice_;
250 
252  uint32_t buckets_allocated_count_;
253 
258  uint32_t hashlist_allocated_count_;
259 
261  uint64_t processed_log_count_;
262 
272  BucketHashList* storage_hashlists_[256];
273 
277  ErrorStack handle_process_buffer(const fs::DirectIoFile &file, IoBufStatus* status);
278 
288  bool bucket_log(storage::StorageId storage_id, uint64_t pos) ALWAYS_INLINE;
289 
299  bool add_new_bucket(storage::StorageId storage_id);
300 
306  void flush_all_buckets();
307 
312  void flush_bucket(const BucketHashList& hashlist);
313 
317  void send_bucket_partition(Bucket* bucket, storage::PartitionId partition);
318  void send_bucket_partition_general(
319  const Bucket* bucket,
320  storage::StorageType storage_type,
321  storage::PartitionId partition,
322  const BufferPosition* positions);
323  void send_bucket_partition_presort(
324  Bucket* bucket,
325  storage::StorageType storage_type,
326  storage::PartitionId partition);
328  void send_bucket_partition_buffer(
329  const Bucket* bucket,
330  storage::PartitionId partition,
331  const char* send_buffer,
332  uint32_t log_count,
333  uint64_t written,
334  uint32_t shortest_key_length,
335  uint32_t longest_key_length);
336 
340  void clear_storage_buckets();
341 
347  inline BucketHashList* find_storage_hashlist(storage::StorageId storage_id) ALWAYS_INLINE {
348  uint8_t index = static_cast<uint8_t>(storage_id);
349  BucketHashList* hashlist = storage_hashlists_[index];
350  if (UNLIKELY(hashlist == nullptr)) {
351  return nullptr;
352  } else {
353  while (UNLIKELY(hashlist->storage_id_ != storage_id)) {
354  hashlist = hashlist->hashlist_next_;
355  if (hashlist == nullptr) {
356  return nullptr;
357  }
358  }
359  }
360 
361  return hashlist;
362  }
364  void add_storage_hashlist(BucketHashList* new_hashlist);
365 
366  void report_completion(double elapsed_sec);
367 };
368 
369 
370 } // namespace snapshot
371 } // namespace foedus
372 #endif // FOEDUS_SNAPSHOT_LOG_MAPPER_IMPL_HPP_
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
Forward declarations of classes in filesystem package.
Definitions of IDs in this package and a few related constant values.
Forward declarations of classes in log manager package.
bool operator>(const Path &lhs, const Path &rhs)
Definition: path.hpp:95
LogMapper(Engine *engine, uint16_t local_ordinal)
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
bool operator==(const Path &lhs, const Path &rhs)
Definition: path.hpp:85
Typedefs of ID types used in thread package.
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
Forward declarations of classes in root package.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Forward declarations of classes in snapshot manager package.
FileStatus status(const Path &p)
Returns the status of the file.
Definition: filesystem.cpp:45
ErrorStack uninitialize_once() override
Typedefs of ID types used in log package.
A log mapper, which reads log files from one logger and sends them to corresponding log reducers...
friend std::ostream & operator<<(std::ostream &o, const LogMapper &v)
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
uint32_t LogFileOrdinal
Ordinal of log files (eg "log.0", "log.1").
Definition: log_id.hpp:46
bool operator!=(const Path &lhs, const Path &rhs)
Definition: path.hpp:89
bool operator<=(const Path &lhs, const Path &rhs)
Definition: path.hpp:94
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
bool operator>=(const Path &lhs, const Path &rhs)
Definition: path.hpp:96
bool operator<(const Path &lhs, const Path &rhs)
Definition: path.hpp:93
ErrorStack initialize_once() override
const uint16_t id_
Unique ID of this mapper or reducer.
StorageType
Type of the storage, such as hash.
Definition: storage_id.hpp:122
#define STATIC_SIZE_CHECK(desired, actual)
Base class for LogMapper and LogReducer to share common code.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
Forward declarations of classes in thread package.
ErrorStack handle_process() override
Implements the specific logics in derived class.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106