libfoedus-core
FOEDUS Core Library
savepoint_manager_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include "foedus/engine.hpp"
26 #include "foedus/fs/filesystem.hpp"
27 #include "foedus/fs/path.hpp"
34 
35 namespace foedus {
36 namespace savepoint {
39  get_global_memory_anchors()->savepoint_manager_memory_;
40  if (engine_->is_master()) {
41  // Savepoint takes place only in master
45  LOG(INFO) << "Initializing SavepointManager.. path=" << savepoint_path_;
46  auto logger_count = engine_->get_options().log_.loggers_per_node_
48  if (fs::exists(savepoint_path_)) {
49  LOG(INFO) << "Existing savepoint file found. Loading..";
50  CHECK_ERROR(savepoint_.load_from_file(savepoint_path_));
51  if (!savepoint_.consistent(logger_count)) {
53  }
54  } else {
55  LOG(INFO) << "Savepoint file does not exist. No savepoint taken so far.";
56  // Create an empty savepoint file now. This makes sure the directory entry for the file
57  // exists.
58  savepoint_.populate_empty(logger_count);
59  CHECK_ERROR(savepoint_.save_to_file(savepoint_path_));
60  }
69  savepoint_thread_ = std::move(std::thread(&SavepointManagerPimpl::savepoint_main, this));
71  } else {
72  // other engines wait for the master engine until it finishes the initialization of
73  // relevant fields. Some of the following modules depend on these values.
74  uint32_t sleep_cont = 0;
75  while (control_block_->master_initialized_ == false) {
76  std::this_thread::sleep_for(std::chrono::milliseconds(10));
77  if (++sleep_cont > 1000ULL) {
78  return ERROR_STACK_MSG(kErrorCodeTimeout, "Master engine couldn't load savepoint??");
79  }
80  }
81  LOG(INFO) << "Okay, master-engine has finished loading initial savepoint.";
82  }
83  return kRetOk;
84 }
85 
87  LOG(INFO) << "Uninitializing SavepointManager..";
88  ErrorStackBatch batch;
89  if (engine_->is_master()) {
90  if (savepoint_thread_.joinable()) {
91  {
94  }
95  savepoint_thread_.join();
96  }
98  }
99  return SUMMARIZE_ERROR_BATCH(batch);
100 }
101 
103  while (get_saved_durable_epoch() < new_global_durable_epoch) {
104  if (get_requested_durable_epoch() < new_global_durable_epoch) {
105  if (get_requested_durable_epoch() < new_global_durable_epoch) {
106  control_block_->requested_durable_epoch_ = new_global_durable_epoch.value();
108  }
109  }
110  {
111  uint64_t demand = control_block_->save_done_event_.acquire_ticket();
112  if (get_saved_durable_epoch() >= new_global_durable_epoch) {
113  break;
114  }
116  }
117  }
118  return kRetOk;
119 }
121  snapshot::SnapshotId new_snapshot_id,
122  Epoch new_snapshot_epoch) {
123  while (get_latest_snapshot_id() != new_snapshot_id) {
124  {
125  control_block_->new_snapshot_id_ = new_snapshot_id;
126  control_block_->new_snapshot_epoch_ = new_snapshot_epoch.value();
128  }
129  {
130  uint64_t demand = control_block_->save_done_event_.acquire_ticket();
131  if (get_latest_snapshot_id() != new_snapshot_id) {
133  }
134  }
135  }
136  ASSERT_ND(get_latest_snapshot_id() == new_snapshot_id);
137  ASSERT_ND(get_latest_snapshot_epoch() == new_snapshot_epoch);
138  return kRetOk;
139 }
140 
142  // write with mutex to not let readers see garbage.
143  // there is only one writer anyway, btw.
148  src);
149 }
150 
152  // read with mutex to not see garbage
154  ASSERT_ND(logger_id < control_block_->savepoint_.get_total_logger_count());
155  return control_block_->savepoint_.logger_info_[logger_id];
156 }
157 
159  LOG(INFO) << "Savepoint thread has started.";
160  while (!is_stop_requested()) {
161  {
162  uint64_t demand = control_block_->save_wakeup_.acquire_ticket();
163  if (!is_stop_requested() &&
166  control_block_->save_wakeup_.timedwait(demand, 100000ULL);
167  }
168  }
169  if (is_stop_requested()) {
170  break;
171  }
174  Savepoint new_savepoint;
176  Epoch new_durable_epoch = get_requested_durable_epoch();
177  new_savepoint.durable_epoch_ = new_durable_epoch.value();
178  new_savepoint.earliest_epoch_ = engine_->get_earliest_epoch().value();
179  engine_->get_log_manager()->copy_logger_states(&new_savepoint);
180 
186  } else {
189  }
190 
193  // TASK(Hideaki) Here, we should update oldest_offset_ by checking where the snapshot_epoch
194  // ends. So far we don't update this, but metalog is anyway tiny, so isn't a big issue.
195  new_savepoint.meta_log_oldest_offset_ = metalog_block->oldest_offset_;
196  new_savepoint.meta_log_durable_offset_ = metalog_block->durable_offset_;
197  new_savepoint.assert_epoch_values();
198 
199  VLOG(0) << "Writing a savepoint...";
200  VLOG(1) << "Savepoint content=" << new_savepoint;
201  COERCE_ERROR(new_savepoint.save_to_file(savepoint_path_));
202  update_shared_savepoint(new_savepoint); // also write to shared memory
203  VLOG(1) << "Wrote a savepoint.";
205  control_block_->saved_durable_epoch_ = new_durable_epoch.value();
208  }
209  }
210  LOG(INFO) << "Savepoint thread has terminated.";
211 }
212 
213 } // namespace savepoint
214 } // namespace foedus
soc::SharedMutex savepoint_mutex_
Read/write to savepoint_ is protected with this mutex.
fs::Path savepoint_path_
Path of the savepoint file.
FixedSavepoint savepoint_
The content of latest savepoint.
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
soc::SharedPolling save_wakeup_
savepoint thread sleeps on this condition variable.
Epoch::EpochInteger new_snapshot_epoch_
Set with new_snapshot_id_.
snapshot::SnapshotId get_latest_snapshot_id() const
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
GlobalMemoryAnchors * get_global_memory_anchors()
snapshot::SnapshotId latest_snapshot_id_
Definition: savepoint.hpp:177
void wait(uint64_t demanded_ticket, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Unconditionally wait for signal.
Epoch::EpochInteger current_epoch_
Current epoch of the entire engine.
Definition: savepoint.hpp:53
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorStack save_to_file(const fs::Path &path) const
Atomically and durably writes out this object to the specified XML file.
Represents a time epoch.
Definition: epoch.hpp:61
The information we maintain in savepoint manager and externalize to a file.
Definition: savepoint.hpp:40
uint64_t meta_log_oldest_offset_
Offset from which metadata log entries are not gleaned yet.
Definition: savepoint.hpp:87
0x0008 : "GENERAL: Timeout." .
Definition: error_code.hpp:112
log::MetaLogControlBlock * meta_logger_memory_
Tiny memory for metadata logger.
soc::SharedPolling save_done_event_
Whenever a savepoint has been taken, this event is fired.
uint64_t meta_log_durable_offset_
Offset upto which metadata log entries are fsynced.
Definition: savepoint.hpp:89
const EngineOptions & get_options() const
Definition: engine.cpp:39
#define COERCE_ERROR(x)
This macro calls x and aborts if encounters an error.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
void copy_logger_states(savepoint::Savepoint *new_savepoint)
Fillup the given savepoint with the current information of the loggers.
Definition: log_manager.cpp:57
Zero is always reserved for invalid epoch.
Definition: epoch.hpp:68
Epoch::EpochInteger saved_durable_epoch_
The durable epoch that has been made persistent in previous savepoint-ing.
fs::FixedPath savepoint_path_
Full path of the savepoint file.
uint64_t acquire_ticket() const
Gives the ticket to.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
Epoch::EpochInteger latest_snapshot_epoch_
Definition: savepoint.hpp:178
Analogue of boost::filesystem::path.
Definition: path.hpp:37
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
Epoch::EpochInteger latest_snapshot_epoch_
The most recently snapshot-ed epoch, all logs upto this epoch is safe to delete.
Definition: savepoint.hpp:84
savepoint::SavepointOptions savepoint_
std::basic_string< CHAR > str() const
Convert to a std::string object.
Auto-lock scope object for SharedMutex.
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
ErrorStack take_savepoint(Epoch new_global_durable_epoch)
ErrorStack take_savepoint_after_snapshot(snapshot::SnapshotId new_snapshot_id, Epoch new_snapshot_epoch)
uint16_t group_count_
Number of ThreadGroup in the engine.
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
void populate_empty(log::LoggerId logger_count)
Populate variables as an initial state.
Definition: savepoint.cpp:79
uint64_t durable_offset_
Offset upto which log entries are fsynced.
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
void announce_new_durable_global_epoch(Epoch new_epoch)
Sets the new global durable epoch and also wakes up threads that were waiting for it...
Definition: log_manager.cpp:42
thread::ThreadOptions thread_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Epoch::EpochInteger earliest_epoch_
The earliest epoch that can exist in this system.
Definition: savepoint.hpp:72
ErrorStack load_from_file(const fs::Path &path)
Load the content of this object from the specified XML file.
const ErrorStack kRetOk
Normal return value for no-error case.
Savepoint savepoint_
The current progress of the entire engine.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
snapshot::SnapshotId new_snapshot_id_
The ID of the new snapshot to remember.
void assert_epoch_values() const
Check invariants on current_epoch_/durable_epoch_.
Definition: savepoint.cpp:25
Epoch::EpochInteger requested_durable_epoch_
Client SOC sets this value and then wakes up the savepoint thread.
LoggerSavepointInfo logger_info_[1U<< 16]
Stores all loggers' information.
Definition: savepoint.hpp:193
0x0701 : "SAVEPNT: Savepoint file is not consistent with other configurations. Check the number of lo...
Definition: error_code.hpp:165
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Epoch get_earliest_epoch() const
Returns the Earliest-Epoch, the minimum epoch that is valid within this engine.
bool consistent(log::LoggerId logger_count) const
Tells if the variables are consistent.
Definition: savepoint.hpp:121
Information in savepoint for one logger.
Definition: savepoint.hpp:140
snapshot::SnapshotId latest_snapshot_id_
The most recent complete snapshot.
Definition: savepoint.hpp:78
Epoch::EpochInteger durable_epoch_
Latest epoch whose logs were all flushed to disk.
Definition: savepoint.hpp:65
uint16_t LoggerId
Typedef for an ID of Logger.
Definition: log_id.hpp:36
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
uint64_t oldest_offset_
Offset from which log entries are not gleaned yet.
LoggerSavepointInfo get_logger_savepoint(log::LoggerId logger_id)
Control block for MetaLogBuffer and MetaLogger.
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
std::thread savepoint_thread_
The thread to take savepoints.
void signal()
Signal it to let waiters exit.
uint16_t loggers_per_node_
Number of loggers per NUMA node.
Definition: log_options.hpp:80
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
void update(uint16_t node_count, uint16_t loggers_per_node_count, const Savepoint &src)
Write out the content of the given Savepoint to this object.
Definition: savepoint.cpp:94