libfoedus-core
FOEDUS Core Library
log_manager_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "foedus/assert_nd.hpp"
27 #include "foedus/engine.hpp"
31 #include "foedus/fs/filesystem.hpp"
32 #include "foedus/fs/path.hpp"
33 #include "foedus/log/log_id.hpp"
43 
44 namespace foedus {
45 namespace log {
47  : engine_(engine), meta_logger_(nullptr), control_block_(nullptr) {}
48 
52  const LoggerId total_loggers = loggers_per_node_ * groups_;
53  const uint16_t total_threads = engine_->get_options().thread_.get_total_thread_count();
54  LOG(INFO) << "Initializing LogManager. #loggers_per_node=" << loggers_per_node_
55  << ", #NUMA-nodes=" << static_cast<int>(groups_) << ", #total_threads=" << total_threads;
59  }
60  // see comments in LogOptions#log_paths_
61  if (total_loggers == 0 || total_loggers % groups_ != 0 || total_threads % total_loggers != 0
62  || total_loggers > total_threads) {
64  }
65 
66  // attach control block
69 
70  // attach logger_refs
71  const uint16_t cores_per_logger = total_threads / total_loggers;
72  for (thread::ThreadGroupId node = 0; node < groups_; ++node) {
73  soc::NodeMemoryAnchors* node_anchors = memory_repo->get_node_memory_anchors(node);
74  for (uint16_t j = 0; j < loggers_per_node_; ++j) {
75  LoggerControlBlock* logger_block = node_anchors->logger_memories_[j];
76  logger_refs_.emplace_back(LoggerRef(
77  engine_,
78  logger_block,
79  node * loggers_per_node_ + j,
80  node,
81  j));
82  }
83  }
84 
85  // meta_buffer_ exists in all engines
88 
89  if (engine_->is_master()) {
90  // In master, we initialize the control block. No local loggers.
91  // Initialize durable_global_epoch_
95  LOG(INFO) << "durable_global_epoch_=" << get_durable_global_epoch();
98  } else {
99  // In SOC, we don't have to initialize the control block, but have to launch local loggers.
100  // evenly distribute loggers to NUMA nodes, then to cores.
101  soc::SocId node = engine_->get_soc_id();
102  thread::ThreadLocalOrdinal current_ordinal = 0;
103  soc::NodeMemoryAnchors* node_anchors = memory_repo->get_node_memory_anchors(node);
104  for (uint16_t j = 0; j < loggers_per_node_; ++j) {
105  LoggerControlBlock* logger_block = node_anchors->logger_memories_[j];
106  std::vector< thread::ThreadId > assigned_thread_ids;
107  for (auto k = 0; k < cores_per_logger; ++k) {
108  assigned_thread_ids.push_back(thread::compose_thread_id(node, current_ordinal));
109  current_ordinal++;
110  }
111  std::string folder = engine_->get_options().log_.convert_folder_path_pattern(node, j);
112  // to avoid race, create the root log folder now.
113  fs::Path path(folder);
114  if (!fs::exists(path)) {
116  }
117  Logger* logger = new Logger(
118  engine_,
119  logger_block,
120  node * loggers_per_node_ + j,
121  node,
122  j,
123  fs::Path(folder),
124  assigned_thread_ids);
125  CHECK_OUTOFMEMORY(logger);
126  loggers_.push_back(logger);
127  }
129 
130  // call initialize() of each logger.
131  // this might take long, so do it in parallel.
132  std::vector<std::thread> init_threads;
133  for (auto j = 0; j < loggers_per_node_; ++j) {
134  Logger* logger = loggers_[j];
135  init_threads.push_back(std::thread([logger]() {
136  COERCE_ERROR(logger->initialize()); // TASK(Hideaki) collect errors
137  }));
138  }
139  LOG(INFO) << "Launched threads to initialize loggers in node-" << node << ". waiting..";
140  for (auto& init_thread : init_threads) {
141  init_thread.join();
142  }
143  LOG(INFO) << "All loggers in node-" << node << " were initialized!";
144  }
145 
146  return kRetOk;
147 }
148 
150  LOG(INFO) << "Uninitializing LogManager..";
151  ErrorStackBatch batch;
155  }
156  if (engine_->is_master()) {
157  ASSERT_ND(loggers_.empty());
158  if (meta_logger_) {
160  delete meta_logger_;
161  meta_logger_ = nullptr;
162  }
163  } else {
164  ASSERT_ND(meta_logger_ == nullptr);
165  }
167  logger_refs_.clear();
168  if (engine_->is_master()) {
170  }
171  return SUMMARIZE_ERROR_BATCH(batch);
172 }
174  for (LoggerRef& logger : logger_refs_) {
175  logger.wakeup();
176  }
177 }
178 
181  Epoch min_durable_epoch;
182  ASSERT_ND(!min_durable_epoch.is_valid());
183  for (const LoggerRef& logger : logger_refs_) {
184  min_durable_epoch.store_min(logger.get_durable_epoch());
185  }
186  ASSERT_ND(min_durable_epoch.is_valid());
187 
188  if (min_durable_epoch <= get_durable_global_epoch()) {
189  VLOG(0) << "durable_global_epoch_ not advanced";
190  return kRetOk;
191  }
192 
193  VLOG(0) << "Global durable epoch is about to advance from " << get_durable_global_epoch()
194  << " to " << min_durable_epoch;
195  {
197  if (min_durable_epoch <= get_durable_global_epoch()) {
198  VLOG(0) << "oh, I lost the race.";
199  return kRetOk;
200  }
201 
203 
204  // set durable_global_epoch_ within the SharedCond's mutex scope, and then broadcast.
205  // this is required to avoid lost signals.
206  control_block_->durable_global_epoch_ = min_durable_epoch.value();
209  }
210  return kRetOk;
211 }
212 
213 
214 ErrorCode LogManagerPimpl::wait_until_durable(Epoch commit_epoch, int64_t wait_microseconds) {
216  if (commit_epoch <= get_durable_global_epoch()) {
217  DVLOG(1) << "Already durable. commit_epoch=" << commit_epoch << ", durable_global_epoch_="
219  return kErrorCodeOk;
220  }
221 
222  if (wait_microseconds == 0) {
223  DVLOG(1) << "Conditional check: commit_epoch=" << commit_epoch << ", durable_global_epoch_="
225  return kErrorCodeTimeout;
226  }
227 
228  std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
229  std::chrono::high_resolution_clock::time_point until
230  = now + std::chrono::microseconds(wait_microseconds);
231  // @spinlock, but with sleep (not frequently called)
232  SPINLOCK_WHILE(commit_epoch > get_durable_global_epoch()) {
233  for (LoggerRef& logger : logger_refs_) {
234  logger.wakeup_for_durable_epoch(commit_epoch);
235  }
236 
237  VLOG(0) << "Synchronously waiting for commit_epoch " << commit_epoch;
238  if (wait_microseconds <= 0) {
239  // set durable_global_epoch_ within the SharedCond's mutex scope, and then wait.
240  // this is required to avoid lost signals.
242  if (commit_epoch <= get_durable_global_epoch()) {
243  break;
244  }
246  continue;
247  }
248 
249  if (std::chrono::high_resolution_clock::now() >= until) {
250  LOG(WARNING) << "Timeout occurs. wait_microseconds=" << wait_microseconds;
251  return kErrorCodeTimeout;
252  }
253 
254  {
256  if (commit_epoch <= get_durable_global_epoch()) {
257  break;
258  }
260  demand,
261  wait_microseconds); // a bit lazy. we sleep longer in case of spurrious wakeup
262  }
263  }
264 
265  VLOG(0) << "durable epoch advanced. durable_global_epoch_=" << get_durable_global_epoch();
266  return kErrorCodeOk;
267 }
273 }
274 
275 
277  new_savepoint->oldest_log_files_.clear();
278  new_savepoint->oldest_log_files_offset_begin_.clear();
279  new_savepoint->current_log_files_.clear();
280  new_savepoint->current_log_files_offset_durable_.clear();
281  for (const LoggerRef& logger : logger_refs_) {
282  logger.copy_logger_state(new_savepoint);
283  }
284 }
285 
286 } // namespace log
287 } // namespace foedus
std::string convert_folder_path_pattern(int node, int logger) const
converts folder_path_pattern_ into a string with the given IDs.
Definition: log_options.cpp:36
soc::SharedPolling durable_global_epoch_advanced_
Fired (broadcast) whenever durable_global_epoch_ is advanced.
std::vector< Logger * > loggers_
Local log writers.
std::vector< uint64_t > oldest_log_files_offset_begin_
Indicates the inclusive beginning of active region in the oldest log file.
Definition: savepoint.hpp:104
std::atomic< Epoch::EpochInteger > durable_global_epoch_
The durable epoch of the entire engine.
void set_engine(Engine *engine)
Definition: attachable.hpp:100
uint8_t ThreadLocalOrdinal
Typedef for a local ID of Thread (core), which is NOT unique across NUMA nodes.
Definition: thread_id.hpp:58
ErrorStack uninitialize_once() override
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
GlobalMemoryAnchors * get_global_memory_anchors()
log::LogManagerControlBlock * log_manager_memory_
Tiny memory for log manager.
Typedefs of ID types used in thread package.
log::LoggerControlBlock ** logger_memories_
Status and synchronization mechanism for loggers on this node.
std::vector< uint64_t > current_log_files_offset_durable_
Indicates the exclusive end of durable region in the current log file.
Definition: savepoint.hpp:114
void wait(uint64_t demanded_ticket, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Unconditionally wait for signal.
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
Definition: error_code.hpp:109
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Represents a time epoch.
Definition: epoch.hpp:61
The information we maintain in savepoint manager and externalize to a file.
Definition: savepoint.hpp:40
virtual void attach(CONTROL_BLOCK *control_block)
Attaches to the given shared memory.
Definition: attachable.hpp:91
Same as GlobalMemoryAnchors except this is for node_memories_.
ErrorStack take_savepoint(Epoch new_global_durable_epoch)
Atomically and durably takes a savepoint for the given epoch advancement.
0x0008 : "GENERAL: Timeout." .
Definition: error_code.hpp:112
Definitions of IDs in this package and a few related constant values.
NodeMemoryAnchors * get_node_memory_anchors(SocId node)
log::MetaLogControlBlock * meta_logger_memory_
Tiny memory for metadata logger.
Typedefs of ID types used in log package.
LogManagerControlBlock * control_block_
A log writer for metadata operation.
#define CHECK_OUTOFMEMORY(ptr)
This macro checks if ptr is nullptr, and if so exists with kErrorCodeOutofmemory error stack...
const EngineOptions & get_options() const
Definition: engine.cpp:39
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
#define COERCE_ERROR(x)
This macro calls x and aborts if encounters an error.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
0x0501 : "LOG : The number of loggers per node must be a submultiple of the number of cores in the...
Definition: error_code.hpp:157
uint64_t acquire_ticket() const
Gives the ticket to.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
A view of Logger object for other SOCs and master engine.
Definition: logger_ref.hpp:35
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Definition: thread_pool.cpp:37
0 means no-error.
Definition: error_code.hpp:87
Analogue of boost::filesystem::path.
Definition: path.hpp:37
MetaLogger * meta_logger_
Metadata log writer.
A log writer that writes out buffered logs to stable storages.
MetaLogBuffer meta_buffer_
Metadata log buffer.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
std::vector< LoggerRef > logger_refs_
All log writers.
Auto-lock scope object for SharedMutex.
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
Definition: filesystem.cpp:89
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
uint16_t group_count_
Number of ThreadGroup in the engine.
Repository of all shared memory in one FOEDUS instance.
Shared data of Logger.
Definition: logger_impl.hpp:51
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
Definition: thread_id.hpp:123
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
bool is_valid() const
Definition: epoch.hpp:96
uint16_t SocId
Represents an ID of an SOC, or NUMA node.
Definition: soc_id.hpp:41
thread::ThreadOptions thread_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
ErrorStack initialize_once() override
void uninitialize_and_delete_all(std::vector< T * > *vec)
A convenience method to uninitialize and delete all Initializable objects in a vector, storing all errors in this batch.
const ErrorStack kRetOk
Normal return value for no-error case.
void copy_logger_states(savepoint::Savepoint *new_savepoint)
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
std::vector< log::LogFileOrdinal > oldest_log_files_
Ordinal of the oldest active log file in each logger.
Definition: savepoint.hpp:101
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
soc::SharedMutex durable_global_epoch_savepoint_mutex_
To-be-removed Serializes the thread to take savepoint to advance durable_global_epoch_.
ThreadId get_total_thread_count() const
void announce_new_durable_global_epoch(Epoch new_epoch)
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
Definition: error_code.hpp:110
ErrorCode wait_until_durable(Epoch commit_epoch, int64_t wait_microseconds)
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
uint16_t LoggerId
Typedef for an ID of Logger.
Definition: log_id.hpp:36
bool is_initialized() const override
Returns whether the object has been already initialized or not.
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
Definition: thread_id.hpp:38
std::vector< log::LogFileOrdinal > current_log_files_
Indicates the log file each logger is currently appending to.
Definition: savepoint.hpp:107
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
Definition: engine.cpp:52
void signal()
Signal it to let waiters exit.
uint16_t loggers_per_node_
Number of loggers per NUMA node.
Definition: log_options.hpp:80
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
void store_min(const Epoch &other)
Kind of std::min(this, other).
Definition: epoch.hpp:153