libfoedus-core
FOEDUS Core Library
snapshot_manager_pimpl.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_SNAPSHOT_SNAPSHOT_MANAGER_PIMPL_HPP_
19 #define FOEDUS_SNAPSHOT_SNAPSHOT_MANAGER_PIMPL_HPP_
20 
21 #include <atomic>
22 #include <chrono>
23 #include <map>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include "foedus/epoch.hpp"
29 #include "foedus/fwd.hpp"
30 #include "foedus/initializable.hpp"
31 #include "foedus/fs/path.hpp"
32 #include "foedus/snapshot/fwd.hpp"
40 
41 namespace foedus {
42 namespace snapshot {
43 
46  // this is backed by shared memory. not instantiation. just reinterpret_cast.
47  LogGleanerControlBlock() = delete;
48  ~LogGleanerControlBlock() = delete;
49 
50  void initialize() {
51  clear_counts();
52  mappers_count_ = 0;
53  reducers_count_ = 0;
54  all_count_ = 0;
55  terminating_ = false;
56  }
57  void uninitialize() {
58  }
59  void clear_counts() {
61  completed_count_ = 0;
63  error_count_ = 0;
64  exit_count_ = 0;
65  gleaning_ = false;
66  cancelled_ = false;
67  }
68 
73  bool is_error() const { return error_count_ > 0 || cancelled_ || terminating_; }
74 
76  std::atomic<bool> gleaning_;
78  std::atomic<bool> cancelled_;
80  std::atomic<bool> terminating_;
81 
84 
90  std::atomic<uint16_t> completed_count_;
91 
97  std::atomic<uint16_t> completed_mapper_count_;
98 
103  std::atomic<uint16_t> error_count_;
104 
109  std::atomic<uint16_t> exit_count_;
110 
112  uint16_t mappers_count_;
114  uint16_t reducers_count_;
116  uint16_t all_count_;
117 };
118 
121  // this is backed by shared memory. not instantiation. just reinterpret_cast.
122  SnapshotManagerControlBlock() = delete;
123  ~SnapshotManagerControlBlock() = delete;
124 
125  void initialize() {
131  }
132  void uninitialize() {
134  }
135 
136  Epoch get_snapshot_epoch() const { return Epoch(snapshot_epoch_.load()); }
138  return Epoch(snapshot_epoch_.load(std::memory_order_relaxed));
139  }
142  return previous_snapshot_id_.load(std::memory_order_relaxed);
143  }
145 
153  }
154 
160  std::atomic< Epoch::EpochInteger > snapshot_epoch_;
161 
168  std::atomic< Epoch::EpochInteger > requested_snapshot_epoch_;
169 
170 
175  std::atomic<SnapshotId> previous_snapshot_id_;
176 
179 
185 
192 
195 };
196 
205  public:
206  SnapshotManagerPimpl() = delete;
207  explicit SnapshotManagerPimpl(Engine* engine)
208  : engine_(engine), local_reducer_(nullptr) {}
209  ErrorStack initialize_once() override;
210  ErrorStack uninitialize_once() override;
211 
213  const SnapshotOptions& get_option() const;
214 
217 
221  }
222 
224 
226  bool wait_completion,
227  Epoch suggested_snapshot_epoch);
237  void stop_snapshot_thread();
238 
242  } else {
244  }
246  }
247 
248  void wakeup();
249  void sleep_a_while();
250  bool is_stop_requested() const { return stop_requested_; }
251  bool is_gleaning() const { return control_block_->gleaner_.gleaning_; }
252 
260  void handle_snapshot();
266 
272  void handle_snapshot_child();
273 
283  const Snapshot& new_snapshot,
284  std::map<storage::StorageId, storage::SnapshotPagePointer>* new_root_page_pointers);
285 
292  const Snapshot& new_snapshot,
293  const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers);
294 
300  ErrorStack snapshot_savepoint(const Snapshot& new_snapshot);
301 
307  const Snapshot& new_snapshot,
308  const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers);
311  const Snapshot& new_snapshot,
312  const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers,
313  void* result_memory,
314  uint16_t parallel_id);
315 
320 
321  Engine* const engine_;
322 
324 
330  std::vector< Snapshot > snapshots_;
331 
333  std::atomic<bool> stop_requested_;
334 
342  std::thread snapshot_thread_;
343 
348  std::chrono::system_clock::time_point previous_snapshot_time_;
349 
351  std::vector<LogMapper*> local_mappers_;
354 
357 };
358 
359 static_assert(
361  "SnapshotManagerControlBlock is too large.");
362 
363 } // namespace snapshot
364 } // namespace foedus
365 #endif // FOEDUS_SNAPSHOT_SNAPSHOT_MANAGER_PIMPL_HPP_
std::atomic< bool > gleaning_
Whether the log gleaner is now running.
Represents the data in one snapshot metadata file.
void handle_snapshot()
Main routine for snapshot_thread_ in master engine.
ErrorStack read_snapshot_metadata(SnapshotId snapshot_id, SnapshotMetadata *out)
void drop_volatile_pages_parallel(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers, void *result_memory, uint16_t parallel_id)
subroutine invoked by one thread for one node.
Typedefs of ID types used in snapshot package.
SnapshotManagerControlBlock * control_block_
const SnapshotOptions & get_option() const
shorthand for engine_->get_options().snapshot_.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ErrorStack handle_snapshot_triggered(Snapshot *new_snapshot)
handle_snapshot() calls this when it should start snapshotting.
Forward declarations of classes in root package.
Local resource for the log gleaner, which runs only in the master node.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Forward declarations of classes in snapshot manager package.
Represents a time epoch.
Definition: epoch.hpp:61
uint16_t reducers_count_
Total number of mappers.
A polling-wait mechanism that can be placed in shared memory and used from multiple processes...
Typical implementation of Initializable as a skeleton base class.
ErrorStack snapshot_savepoint(const Snapshot &new_snapshot)
Sub-routine of handle_snapshot_triggered().
std::atomic< SnapshotId > previous_snapshot_id_
ID of previously completed snapshot.
std::atomic< uint16_t > exit_count_
count of mappers/reducers that have exitted.
LogGleanerResource gleaner_resource_
Local resources for gleaner, which runs only in the master node.
Zero is always reserved for invalid epoch.
Definition: epoch.hpp:68
bool is_error() const
If this returns true, all mappers and reducers should exit as soon as possible.
LogGleanerControlBlock gleaner_
Gleaner-related variables.
uint16_t mappers_count_
Total number of mappers.
Analogue of boost::filesystem::path.
Definition: path.hpp:37
std::atomic< uint16_t > error_count_
count of mappers/reducers that have exitted with some error.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint16_t all_count_
mappers_count_ + reducers_count_.
void wakeup_snapshot_children()
Fires snapshot_children_wakeup_.
soc::SharedPolling snapshot_wakeup_
Snapshot thread sleeps on this condition variable.
ErrorStack snapshot_metadata(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
std::vector< Snapshot > snapshots_
All previously taken snapshots.
SnapshotId increment(SnapshotId id)
Increment SnapshotId.
Definition: snapshot_id.hpp:52
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
Definition: snapshot.hpp:37
std::chrono::system_clock::time_point previous_snapshot_time_
When snapshot_thread_ took snapshot last time.
std::atomic< Epoch::EpochInteger > requested_snapshot_epoch_
When a caller wants to immediately invoke snapshot, it calls trigger_snapshot_immediate(), which sets this value and then wakes up snapshot_thread_.
Set of options for snapshot manager.
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
fs::Path get_snapshot_metadata_file_path(SnapshotId snapshot_id) const
each snapshot has a snapshot-metadata file "snapshot_metadata_.xml" in first node's firs...
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
LogReducer * local_reducer_
Reducer in this node.
std::vector< LogMapper * > local_mappers_
Mappers in this node.
Snapshot cur_snapshot_
The snapshot we are now taking.
ErrorStack drop_volatile_pages(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
std::atomic< bool > terminating_
Whether the engine is being terminated.
void stop_snapshot_thread()
This is a hidden API called at the beginning of engine shutdown (namely restart manager).
std::atomic< uint16_t > completed_count_
count of mappers/reducers that have completed processing the current epoch.
soc::SharedPolling snapshot_taken_
Fired (notify_all) whenever snapshotting is completed.
ErrorStack glean_logs(const Snapshot &new_snapshot, std::map< storage::StorageId, storage::SnapshotPagePointer > *new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
std::atomic< Epoch::EpochInteger > snapshot_epoch_
The most recently snapshot-ed epoch, all logs upto this epoch is safe to delete.
void handle_snapshot_child()
Main routine for snapshot_thread_ in child engines.
std::thread snapshot_thread_
The daemon thread of snapshot manager.
soc::SharedPolling snapshot_children_wakeup_
Child snapshot managers (the ones in SOC engines) sleep on this condition until the master snapshot m...
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
void trigger_snapshot_immediate(bool wait_completion, Epoch suggested_snapshot_epoch)
std::atomic< bool > cancelled_
Whether the log gleaner has been cancalled.
std::atomic< bool > stop_requested_
To locally shutdown snapshot_thread_.
void signal()
Signal it to let waiters exit.
std::atomic< uint16_t > completed_mapper_count_
We also have a separate count for mappers only to know if all mappers are done.