libfoedus-core
FOEDUS Core Library
log_reducer_impl.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_SNAPSHOT_LOG_REDUCER_IMPL_HPP_
19 #define FOEDUS_SNAPSHOT_LOG_REDUCER_IMPL_HPP_
20 #include <stdint.h>
21 
22 #include <atomic>
23 #include <iosfwd>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <vector>
28 
29 #include "foedus/attachable.hpp"
30 #include "foedus/fwd.hpp"
31 #include "foedus/initializable.hpp"
35 #include "foedus/fs/fwd.hpp"
36 #include "foedus/fs/path.hpp"
37 #include "foedus/log/fwd.hpp"
38 #include "foedus/log/log_id.hpp"
40 #include "foedus/snapshot/fwd.hpp"
47 #include "foedus/storage/fwd.hpp"
50 #include "foedus/thread/fwd.hpp"
51 
52 namespace foedus {
53 namespace snapshot {
54 
61 };
62 
70  uint64_t word;
71  struct Components {
72  uint16_t active_writers_;
73  uint16_t flags_;
75  } components;
76 
77  bool is_no_more_writers() const {
78  return (components.flags_ & kFlagNoMoreWriters) != 0;
79  }
80  bool is_clear() const { return word == 0; }
81  uint16_t get_active_writers() const { return components.active_writers_; }
84 };
85 
94  enum Constants {
99  };
100  bool is_full_block() const {
104  }
105  bool is_filler() const {
109  }
110 
111  friend std::ostream& operator<<(std::ostream& o, const BlockHeaderBase& v);
112 
117  uint32_t magic_word_;
120 };
121 
130  uint32_t log_count_;
135  inline void assert_key_length() const {
136  ASSERT_ND(shortest_key_length_ > 0);
137  ASSERT_ND(shortest_key_length_ <= longest_key_length_);
138  }
139 };
140 
141 
153 
160  // this is backed by shared memory. not instantiation. just reinterpret_cast.
161  LogReducerControlBlock() = delete;
162  ~LogReducerControlBlock() = delete;
163 
164  void initialize() {
165  clear();
166  }
167  void clear() {
168  current_buffer_ = 0;
169  buffer_status_[0].store(0U);
170  buffer_status_[1].store(0U);
172  }
173  void uninitialize() {
174  }
175 
178  ret.word = buffer_status_[index % 2].load();
179  return ret;
180  }
181  std::atomic<uint64_t>* get_buffer_status_address(uint32_t index) {
182  return &buffer_status_[index % 2];
183  }
184 
187  }
190  }
191 
196  std::atomic<uint64_t> buffer_status_[2];
197 
202  std::atomic<uint32_t> current_buffer_;
203 
209  std::atomic<uint32_t> total_storage_count_;
210 
212  uint16_t id_;
213 };
214 
215 
293 class LogReducer final : public MapReduceBase {
294  public:
295  explicit LogReducer(Engine* engine);
296 
297  ErrorStack initialize_once() override;
298  ErrorStack uninitialize_once() override;
299 
300  std::string to_string() const override { return std::string("Reducer-") + std::to_string(id_); }
301  friend std::ostream& operator<<(std::ostream& o, const LogReducer& v);
302 
303  protected:
304  ErrorStack handle_process() override;
305 
306  private:
310  struct MergeContext {
311  explicit MergeContext(uint32_t dumped_files_count_);
312  ~MergeContext();
313 
319  const uint32_t dumped_files_count_;
320  memory::AlignedMemory io_memory_;
321  std::vector< memory::AlignedMemorySlice > io_buffers_;
322 
331  std::vector< std::unique_ptr<SortedBuffer> > sorted_buffers_;
332 
337  std::vector< std::unique_ptr<fs::DirectIoFile> > sorted_files_auto_ptrs_;
338 
339  SortedBuffer** tmp_sorted_buffer_array_;
340  uint32_t tmp_sorted_buffer_count_;
341 
346  storage::StorageId get_min_storage_id() const;
347  void set_tmp_sorted_buffer_array(storage::StorageId storage_id);
348  };
349 
350  LogReducerControlBlock* control_block_;
351 
356  void* buffers_[2];
357 
362  uint64_t buffer_half_size_bytes_;
363 
369  storage::Page* root_info_pages_;
370 
374  cache::SnapshotFileSet previous_snapshot_files_;
375 
379  memory::AlignedMemory dump_io_buffer_;
380 
385  memory::AlignedMemory sort_buffer_;
386 
392  memory::AlignedMemory positions_buffers_;
393 
395  memory::AlignedMemorySlice input_positions_slice_;
397  memory::AlignedMemorySlice output_positions_slice_;
398 
400  memory::AlignedMemory writer_pool_memory_;
407  memory::AlignedMemory writer_intermediate_memory_;
408 
415  uint32_t sorted_runs_;
416 
417  void expand_if_needed(
418  uint64_t required_size,
419  memory::AlignedMemory *memory,
420  const std::string& name);
422  void expand_positions_buffers_if_needed(uint64_t required_size_per_buffer);
423 
424  fs::Path get_sorted_run_file_path(uint32_t sorted_run) const;
425 
434  ErrorStack dump_buffer();
444  ErrorStack dump_buffer_wait_for_writers(uint32_t buffer_index) const;
445 
452  void dump_buffer_scan_block_headers(
453  char* buffer_base,
454  BufferPosition tail_position,
455  std::map<storage::StorageId, std::vector<BufferPosition> > *blocks) const;
456 
462  ErrorStack dump_buffer_sort_storage(
463  const LogBuffer &buffer,
464  storage::StorageId storage_id,
465  const std::vector<BufferPosition>& log_positions,
466  uint32_t* out_shortest_key_length,
467  uint32_t* out_longest_key_length,
468  uint32_t* written_count);
469 
474  ErrorStack dump_buffer_sort_storage_write(
475  const LogBuffer &buffer,
476  storage::StorageId storage_id,
477  const BufferPosition* sorted_logs,
478  uint32_t shortest_key_length,
479  uint32_t longest_key_length,
480  uint32_t log_count,
481  fs::DirectIoFile *dump_file);
486  uint64_t dump_block_header(
487  const LogBuffer &buffer,
488  storage::StorageId storage_id,
489  const BufferPosition* sorted_logs,
490  uint32_t shortest_key_length,
491  uint32_t longest_key_length,
492  uint32_t log_count,
493  void* destination) const;
494 
504  ErrorStack merge_sort();
505 
507  void merge_sort_check_buffer_status() const;
513  ErrorStack merge_sort_dump_last_buffer();
514 
518  void merge_sort_allocate_io_buffers(MergeContext* context) const;
522  ErrorStack merge_sort_open_sorted_runs(MergeContext* context) const;
526  ErrorStack merge_sort_initialize_sort_buffers(MergeContext* context) const;
533  ErrorCode merge_sort_advance_sort_buffers(
534  SortedBuffer* buffer,
535  storage::StorageId processed_storage_id) const;
536 
537  uint32_t get_max_storage_count() const;
538 };
539 
540 
541 static_assert(
542  sizeof(LogReducerControlBlock) <= soc::NodeMemoryAnchors::kLogReducerMemorySize,
543  "LogReducerControlBlock is too large.");
544 
545 } // namespace snapshot
546 } // namespace foedus
547 #endif // FOEDUS_SNAPSHOT_LOG_REDUCER_IMPL_HPP_
friend std::ostream & operator<<(std::ostream &o, const BlockHeaderBase &v)
std::atomic< uint64_t > * get_buffer_status_address(uint32_t index)
Forward declarations of classes in filesystem package.
ErrorStack initialize_once() override
Definitions of IDs in this package and a few related constant values.
Forward declarations of classes in log manager package.
Typedefs of ID types used in snapshot package.
Compactly represents important status informations of a reducer buffer.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ReducerBufferStatus get_current_buffer_status() const
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
Forward declarations of classes in root package.
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Forward declarations of classes in snapshot manager package.
struct foedus::snapshot::ReducerBufferStatus::Components components
Represents one input stream of sorted log entries.
Definition: log_buffer.hpp:81
A bit-wise flag in ReducerBufferStatus's flags_.
Holds a set of read-only file objects for snapshot files.
Typedefs of ID types used in log package.
ErrorStack handle_process() override
Implements the specific logics in derived class.
ReducerBufferStatus get_non_current_buffer_status() const
std::atomic< uint64_t > buffer_status_[2]
Status of the two reducer buffers.
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
A header for a dummy storage block that fills the gap between the end of previous storage block and t...
ErrorStack uninitialize_once() override
uint64_t from_buffer_position(BufferPosition buffer_position)
Definition: snapshot_id.hpp:78
Analogue of boost::filesystem::path.
Definition: path.hpp:37
BufferPosition get_tail_position() const
Forward declarations of classes in storage package.
All blocks that have content start with this header.
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
Constants and methods related to CPU cacheline and its prefetching.
All log blocks in mapper/reducers start with this header.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
BufferPosition block_length_
Length (in 8-bytes) of this block including the header.
uint32_t magic_word_
This is used to identify the storage block is a dummy (filler) one or a full one. ...
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
A slice of foedus::memory::AlignedMemory.
const uint16_t id_
Unique ID of this mapper or reducer.
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
Represents an I/O stream on one file without filesystem caching.
Represents one memory block aligned to actual OS/hardware pages.
Base class for LogMapper and LogReducer to share common code.
uint32_t longest_key_length_
additional statistics for masstree/hash
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Forward declarations of classes in thread package.
Raw atomic operations that work for both C++11 and non-C++11 code.
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
friend std::ostream & operator<<(std::ostream &o, const LogReducer &v)
uint16_t id_
ID of this reducer (or numa node ID).
uint32_t shortest_key_length_
additional statistics for masstree/hash