libfoedus-core
FOEDUS Core Library
foedus::snapshot::SnapshotManagerPimpl Class Referencefinal

Pimpl object of SnapshotManager. More...

Detailed Description

Pimpl object of SnapshotManager.

A private pimpl object for SnapshotManager. Do not include this header from a client program unless you know what you are doing.

Definition at line 204 of file snapshot_manager_pimpl.hpp.

#include <snapshot_manager_pimpl.hpp>

Inheritance diagram for foedus::snapshot::SnapshotManagerPimpl:
Collaboration diagram for foedus::snapshot::SnapshotManagerPimpl:

Public Member Functions

 SnapshotManagerPimpl ()=delete
 
 SnapshotManagerPimpl (Engine *engine)
 
ErrorStack initialize_once () override
 
ErrorStack uninitialize_once () override
 
const SnapshotOptionsget_option () const
 shorthand for engine_->get_options().snapshot_. More...
 
Epoch get_snapshot_epoch () const
 
Epoch get_snapshot_epoch_weak () const
 
SnapshotId get_previous_snapshot_id () const
 
SnapshotId get_previous_snapshot_id_weak () const
 
ErrorStack read_snapshot_metadata (SnapshotId snapshot_id, SnapshotMetadata *out)
 
void trigger_snapshot_immediate (bool wait_completion, Epoch suggested_snapshot_epoch)
 
void stop_snapshot_thread ()
 This is a hidden API called at the beginning of engine shutdown (namely restart manager). More...
 
SnapshotId issue_next_snapshot_id ()
 
void wakeup ()
 
void sleep_a_while ()
 
bool is_stop_requested () const
 
bool is_gleaning () const
 
void handle_snapshot ()
 Main routine for snapshot_thread_ in master engine. More...
 
ErrorStack handle_snapshot_triggered (Snapshot *new_snapshot)
 handle_snapshot() calls this when it should start snapshotting. More...
 
void handle_snapshot_child ()
 Main routine for snapshot_thread_ in child engines. More...
 
ErrorStack glean_logs (const Snapshot &new_snapshot, std::map< storage::StorageId, storage::SnapshotPagePointer > *new_root_page_pointers)
 Sub-routine of handle_snapshot_triggered(). More...
 
ErrorStack snapshot_metadata (const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
 Sub-routine of handle_snapshot_triggered(). More...
 
ErrorStack snapshot_savepoint (const Snapshot &new_snapshot)
 Sub-routine of handle_snapshot_triggered(). More...
 
ErrorStack drop_volatile_pages (const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
 Sub-routine of handle_snapshot_triggered(). More...
 
void drop_volatile_pages_parallel (const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers, void *result_memory, uint16_t parallel_id)
 subroutine invoked by one thread for one node. More...
 
fs::Path get_snapshot_metadata_file_path (SnapshotId snapshot_id) const
 each snapshot has a snapshot-metadata file "snapshot_metadata_<SNAPSHOT_ID>.xml" in first node's first partition folder. More...
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Public Attributes

Engine *const engine_
 
SnapshotManagerControlBlockcontrol_block_
 
std::vector< Snapshotsnapshots_
 All previously taken snapshots. More...
 
std::atomic< bool > stop_requested_
 To locally shutdown snapshot_thread_. More...
 
std::thread snapshot_thread_
 The daemon thread of snapshot manager. More...
 
std::chrono::system_clock::time_point previous_snapshot_time_
 When snapshot_thread_ took snapshot last time. More...
 
std::vector< LogMapper * > local_mappers_
 Mappers in this node. More...
 
LogReducerlocal_reducer_
 Reducer in this node. More...
 
LogGleanerResource gleaner_resource_
 Local resources for gleaner, which runs only in the master node. More...
 

Constructor & Destructor Documentation

foedus::snapshot::SnapshotManagerPimpl::SnapshotManagerPimpl ( )
delete
foedus::snapshot::SnapshotManagerPimpl::SnapshotManagerPimpl ( Engine engine)
inlineexplicit

Definition at line 207 of file snapshot_manager_pimpl.hpp.

208  : engine_(engine), local_reducer_(nullptr) {}
LogReducer * local_reducer_
Reducer in this node.

Member Function Documentation

ErrorStack foedus::snapshot::SnapshotManagerPimpl::drop_volatile_pages ( const Snapshot new_snapshot,
const std::map< storage::StorageId, storage::SnapshotPagePointer > &  new_root_page_pointers 
)

Sub-routine of handle_snapshot_triggered().

Drop pointers to volatile pages based on the already-installed snapshot pointers.

Definition at line 488 of file snapshot_manager_pimpl.cpp.

References foedus::memory::AlignedMemory::alloc(), ASSERT_ND, foedus::memory::PagePoolOffsetChunk::clear(), foedus::storage::Composer::drop_root_volatile(), drop_volatile_pages_parallel(), foedus::debugging::StopWatch::elapsed_ms(), engine_, foedus::memory::AlignedMemory::get_block(), foedus::Engine::get_soc_count(), foedus::Engine::get_xct_manager(), foedus::Epoch::is_valid(), foedus::memory::AlignedMemory::kNumaAllocOnnode, foedus::kRetOk, foedus::storage::Composer::DropResult::max_observed_, foedus::snapshot::Snapshot::max_storage_id_, foedus::xct::XctManager::pause_accepting_xct(), foedus::memory::AlignedMemory::release_block(), foedus::xct::XctManager::resume_accepting_xct(), foedus::debugging::StopWatch::stop(), and foedus::snapshot::Snapshot::valid_until_epoch_.

Referenced by handle_snapshot_triggered().

490  {
491  // To speed up, we parallelize this process per node, and use the same partitioning scheme.
492  LOG(INFO) << "Dropping volatile pointers...";
493 
494  // initializations done.
495  // below, we should release the resources before exiting. So, let's not just use CHECK_ERROR.
496  const uint16_t soc_count = engine_->get_soc_count();
497 
498  // collect results of pointer dropping for all storages for all nodes.
499  // this is just to drop the root page. DropResult[storage_id][node].
500  memory::AlignedMemory result_memory;
501  result_memory.alloc(
502  sizeof(storage::Composer::DropResult) * soc_count * (new_snapshot.max_storage_id_ + 2U),
503  1U << 12,
505  0);
506  memory::AlignedMemory chunks_memory; // only for drop_root
507  chunks_memory.alloc(
508  sizeof(memory::PagePoolOffsetChunk) * soc_count,
509  1U << 12,
511  0);
512 
513  // So far, we pause transaction executions during this step to simplify the algorithm.
514  // Without this simplification, not only this thread but also normal transaction executions
515  // have to do several complex and expensive checks.
517  // It will take a while for individual worker threads to complete the currently running xcts.
518  // Just wait for a while to let that happen.
519  std::this_thread::sleep_for(std::chrono::milliseconds(100)); // almost forever in OLTP xcts.
520  LOG(INFO) << "Paused transaction executions to safely drop volatile pages and waited enough"
521  << " to let currently running xcts end. Now start replace pointers.";
522  debugging::StopWatch stop_watch;
523 
524  std::vector< std::thread > threads;
525  for (uint16_t node = 0; node < soc_count; ++node) {
526  threads.emplace_back(
528  this,
529  new_snapshot,
530  new_root_page_pointers,
531  result_memory.get_block(),
532  node);
533  }
534 
535  for (std::thread& thr : threads) {
536  thr.join();
537  }
538 
539  LOG(INFO) << "Joined child threads. Now consider dropping root pages";
540 
541  // At last, we consider dropping root volatile pages.
542  // usually, this happens only when the root page doesn't have any volatile pointer.
543  // As an exceptional case, we might drop ALL volatile pages of a storage whose max_observed
544  // is not updated but for some reason dropped_all is false, eg non-matching boundaries.
545  // even in that case, we can drop all volatile pages safely because this is within pause.
546  memory::PagePoolOffsetChunk* dropped_chunks = reinterpret_cast<memory::PagePoolOffsetChunk*>(
547  chunks_memory.get_block());
548  for (uint16_t node = 0; node < soc_count; ++node) {
549  dropped_chunks[node].clear();
550  }
551  storage::Composer::DropResult* results
552  = reinterpret_cast<storage::Composer::DropResult*>(result_memory.get_block());
553  for (storage::StorageId id = 1; id <= new_snapshot.max_storage_id_; ++id) {
554  VLOG(1) << "Considering to drop root page of storage-" << id << " ...";
555  bool cannot_drop = false;
556  for (uint16_t node = 0; node < soc_count; ++node) {
557  storage::Composer::DropResult result = results[soc_count * id + node];
558  ASSERT_ND(result.max_observed_.is_valid());
559  ASSERT_ND(result.max_observed_ >= new_snapshot.valid_until_epoch_);
560  if (result.max_observed_ > new_snapshot.valid_until_epoch_) {
561  cannot_drop = true;
562  break;
563  }
564  }
565  if (cannot_drop) {
566  continue;
567  }
568  LOG(INFO) << "Looks like we can drop ALL volatile pages of storage-" << id << "!!!";
569  // Still, the specific implementation of the storage might not choose to do so.
570  // We call a method in composer.
571  uint64_t dropped_count = 0;
572  storage::Composer::DropVolatilesArguments args = {
573  new_snapshot,
574  0,
575  false,
576  dropped_chunks,
577  &dropped_count};
578  storage::Composer composer(engine_, id);
579  composer.drop_root_volatile(args);
580  LOG(INFO) << "As a result, we dropped " << dropped_count << " pages from storage-" << id;
581  }
582 
583 
585 
586  stop_watch.stop();
587  LOG(INFO) << "Total: Dropped volatile pages in " << stop_watch.elapsed_ms() << "ms.";
588 
589  chunks_memory.release_block();
590  result_memory.release_block();
591 
592  return kRetOk;
593 }
numa_alloc_onnode() and numa_free().
void drop_volatile_pages_parallel(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers, void *result_memory, uint16_t parallel_id)
subroutine invoked by one thread for one node.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
void resume_accepting_xct()
Make sure you call this after pause_accepting_xct().
Definition: xct_manager.cpp:34
void pause_accepting_xct()
Pause all begin_xct until you call resume_accepting_xct()
Definition: xct_manager.cpp:33
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::drop_volatile_pages_parallel ( const Snapshot new_snapshot,
const std::map< storage::StorageId, storage::SnapshotPagePointer > &  new_root_page_pointers,
void *  result_memory,
uint16_t  parallel_id 
)

subroutine invoked by one thread for one node.

Definition at line 595 of file snapshot_manager_pimpl.cpp.

References foedus::memory::AlignedMemory::alloc(), ASSERT_ND, foedus::memory::PagePoolOffsetChunk::clear(), foedus::storage::Composer::drop_volatiles(), foedus::storage::Composer::DropResult::dropped_all_, foedus::debugging::StopWatch::elapsed_ms(), foedus::debugging::StopWatch::elapsed_sec(), foedus::memory::PagePoolOffsetChunk::empty(), engine_, foedus::memory::AlignedMemory::get_block(), foedus::Engine::get_memory_manager(), foedus::memory::EngineMemory::get_node_memory(), foedus::Engine::get_soc_count(), foedus::storage::StorageManager::get_storage(), foedus::Engine::get_storage_manager(), foedus::memory::NumaNodeMemoryRef::get_volatile_pool(), foedus::memory::AlignedMemory::kNumaAllocOnnode, foedus::storage::Composer::DropResult::max_observed_, foedus::snapshot::Snapshot::max_storage_id_, foedus::storage::StorageControlBlock::meta_, foedus::storage::Metadata::name_, foedus::Epoch::one_more(), foedus::memory::PagePool::release(), foedus::memory::AlignedMemory::release_block(), foedus::storage::StorageControlBlock::root_page_pointer_, foedus::storage::Metadata::root_snapshot_page_id_, foedus::memory::PagePoolOffsetChunk::size(), foedus::debugging::StopWatch::stop(), and foedus::snapshot::Snapshot::valid_until_epoch_.

Referenced by drop_volatile_pages().

599  {
600  // this thread is pinned on its own socket. We use the same partitioning scheme as reducer
601  // so that this method mostly hits local pages
602  thread::NumaThreadScope numa_scope(parallel_id);
603 
604  const uint16_t soc_count = engine_->get_soc_count();
605  storage::Composer::DropResult* results // DropResult[storage_id][node]
606  = reinterpret_cast<storage::Composer::DropResult*>(result_memory);
607 
608  // To avoid invoking volatile pool for every dropped page, we cache them in chunks
609  memory::AlignedMemory chunks_memory;
610  chunks_memory.alloc(
611  sizeof(memory::PagePoolOffsetChunk) * soc_count,
612  1U << 12,
614  parallel_id);
615  memory::PagePoolOffsetChunk* dropped_chunks = reinterpret_cast<memory::PagePoolOffsetChunk*>(
616  chunks_memory.get_block());
617  for (uint16_t node = 0; node < soc_count; ++node) {
618  dropped_chunks[node].clear();
619  }
620 
621  LOG(INFO) << "Thread-" << parallel_id << " started dropping volatile pages.";
622 
623  uint64_t dropped_count_total = 0;
624  debugging::StopWatch stop_watch;
625  for (storage::StorageId id = 1; id <= new_snapshot.max_storage_id_; ++id) {
626  const auto& it = new_root_page_pointers.find(id);
627  if (it != new_root_page_pointers.end()) {
628  VLOG(0) << "Dropping pointers for storage-" << id << " ...";
629  storage::SnapshotPagePointer new_root_page_pointer = it->second;
630  ASSERT_ND(new_root_page_pointer != 0);
631  storage::Composer composer(engine_, id);
632  uint64_t dropped_count = 0;
633  storage::Composer::DropVolatilesArguments args = {
634  new_snapshot,
635  parallel_id,
636  true,
637  dropped_chunks,
638  &dropped_count};
639  debugging::StopWatch watch;
640  storage::Composer::DropResult result = composer.drop_volatiles(args);
642  snapshot_pointer_ == new_root_page_pointer);
644  == new_root_page_pointer);
645  dropped_count_total += dropped_count;
646  watch.stop();
647  LOG(INFO) << "Thread-" << parallel_id << " drop_volatiles for storage-" << id
648  << " (" << engine_->get_storage_manager()->get_storage(id)->meta_.name_ << ")"
649  << " took " << watch.elapsed_sec() << "s. dropped_count=" << dropped_count
650  << ". result =" << result;
651  results[soc_count * id + parallel_id] = result;
652  } else {
653  VLOG(0) << "Thread-" << parallel_id << " storage-"
654  << id << " wasn't changed no drop pointers";
655  results[soc_count * id + parallel_id].max_observed_
656  = new_snapshot.valid_until_epoch_.one_more(); // do NOT drop root page in this case.
657  results[soc_count * id + parallel_id].dropped_all_ = false;
658  }
659  }
660 
661  stop_watch.stop();
662  LOG(INFO) << "Thread-" << parallel_id << " dropped " << dropped_count_total
663  << " volatile pointers in " << stop_watch.elapsed_ms() << "ms.";
664 
665  for (uint16_t node = 0; node < soc_count; ++node) {
666  memory::PagePoolOffsetChunk* chunk = dropped_chunks + node;
667  memory::PagePool* volatile_pool
669  if (!chunk->empty()) {
670  volatile_pool->release(chunk->size(), chunk);
671  }
672  ASSERT_ND(chunk->empty());
673  }
674  chunks_memory.release_block();
675 }
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
numa_alloc_onnode() and numa_free().
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
DualPagePointer root_page_pointer_
Points to the root page (or something equivalent).
Definition: storage.hpp:82
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
StorageName name_
the unique name of this storage.
Definition: metadata.hpp:107
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
Definition: page_pool.cpp:134
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
StorageControlBlock * get_storage(StorageId id)
Returns the storage of given ID.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112

Here is the call graph for this function:

Here is the caller graph for this function:

const SnapshotOptions & foedus::snapshot::SnapshotManagerPimpl::get_option ( ) const

shorthand for engine_->get_options().snapshot_.

Definition at line 57 of file snapshot_manager_pimpl.cpp.

References engine_, foedus::Engine::get_options(), and foedus::EngineOptions::snapshot_.

Referenced by get_snapshot_metadata_file_path(), handle_snapshot(), and snapshot_metadata().

57  {
58  return engine_->get_options().snapshot_;
59 }
const EngineOptions & get_options() const
Definition: engine.cpp:39
snapshot::SnapshotOptions snapshot_

Here is the call graph for this function:

Here is the caller graph for this function:

SnapshotId foedus::snapshot::SnapshotManagerPimpl::get_previous_snapshot_id ( ) const
inline

Definition at line 218 of file snapshot_manager_pimpl.hpp.

References control_block_, and foedus::snapshot::SnapshotManagerControlBlock::get_previous_snapshot_id().

Referenced by foedus::snapshot::SnapshotManager::get_previous_snapshot_id().

Here is the call graph for this function:

Here is the caller graph for this function:

SnapshotId foedus::snapshot::SnapshotManagerPimpl::get_previous_snapshot_id_weak ( ) const
inline

Definition at line 219 of file snapshot_manager_pimpl.hpp.

References control_block_, and foedus::snapshot::SnapshotManagerControlBlock::get_previous_snapshot_id_weak().

Referenced by foedus::snapshot::SnapshotManager::get_previous_snapshot_id_weak().

Here is the call graph for this function:

Here is the caller graph for this function:

Epoch foedus::snapshot::SnapshotManagerPimpl::get_snapshot_epoch ( ) const
inline
Epoch foedus::snapshot::SnapshotManagerPimpl::get_snapshot_epoch_weak ( ) const
inline

Definition at line 216 of file snapshot_manager_pimpl.hpp.

References control_block_, and foedus::snapshot::SnapshotManagerControlBlock::get_snapshot_epoch_weak().

Referenced by foedus::snapshot::SnapshotManager::get_snapshot_epoch_weak().

Here is the call graph for this function:

Here is the caller graph for this function:

fs::Path foedus::snapshot::SnapshotManagerPimpl::get_snapshot_metadata_file_path ( SnapshotId  snapshot_id) const

each snapshot has a snapshot-metadata file "snapshot_metadata_<SNAPSHOT_ID>.xml" in first node's first partition folder.

Definition at line 480 of file snapshot_manager_pimpl.cpp.

References get_option().

Referenced by read_snapshot_metadata(), and snapshot_metadata().

480  {
481  fs::Path folder(get_option().get_primary_folder_path());
482  fs::Path file(folder);
483  file /= std::string("snapshot_metadata_")
484  + std::to_string(snapshot_id) + std::string(".xml");
485  return file;
486 }
const SnapshotOptions & get_option() const
shorthand for engine_->get_options().snapshot_.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::glean_logs ( const Snapshot new_snapshot,
std::map< storage::StorageId, storage::SnapshotPagePointer > *  new_root_page_pointers 
)

Sub-routine of handle_snapshot_triggered().

Read log files, distribute them to each partition, and construct snapshot files at each partition. After successful completion, all snapshot files become also durable (LogGleaner's uninitialize() makes it sure). Thus, now we can start installing pointers to the new snapshot file pages.

Definition at line 386 of file snapshot_manager_pimpl.cpp.

References engine_, foedus::snapshot::LogGleaner::execute(), foedus::snapshot::LogGleaner::get_new_root_page_pointers(), gleaner_resource_, and foedus::ErrorStack::is_error().

Referenced by handle_snapshot_triggered().

388  {
389  // Log gleaner is an object allocated/deallocated per snapshotting.
390  // Gleaner runs on this thread (snapshot_thread_)
391  LogGleaner gleaner(engine_, &gleaner_resource_, new_snapshot);
392  ErrorStack result = gleaner.execute();
393  if (result.is_error()) {
394  LOG(ERROR) << "Log Gleaner encountered either an error or early termination request";
395  }
396  // the output is list of pointers to new root pages
397  *new_root_page_pointers = gleaner.get_new_root_page_pointers();
398  return result;
399 }
LogGleanerResource gleaner_resource_
Local resources for gleaner, which runs only in the master node.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::handle_snapshot ( )

Main routine for snapshot_thread_ in master engine.

This method keeps taking snapshot periodically. When there are no logs in all the private buffers for a while, it goes into sleep. This method exits when this object's uninitialize() is called.

Definition at line 191 of file snapshot_manager_pimpl.cpp.

References control_block_, engine_, foedus::log::LogManager::get_durable_global_epoch(), foedus::Engine::get_log_manager(), get_option(), foedus::snapshot::SnapshotManagerControlBlock::get_requested_snapshot_epoch(), get_snapshot_epoch(), handle_snapshot_triggered(), foedus::ErrorStack::is_error(), foedus::Engine::is_initialized(), is_stop_requested(), foedus::Epoch::is_valid(), foedus::assorted::memory_fence_acquire(), previous_snapshot_time_, sleep_a_while(), and SPINLOCK_WHILE.

Referenced by initialize_once().

191  {
192  LOG(INFO) << "Snapshot daemon started";
193  // The actual snapshotting can't start until all other modules are initialized.
196  }
197 
198  LOG(INFO) << "Snapshot daemon now starts taking snapshot";
199  while (!is_stop_requested()) {
200  sleep_a_while();
201  if (is_stop_requested()) {
202  break;
203  }
204  // should we start snapshotting? or keep sleeping?
205  bool triggered = false;
206  std::chrono::system_clock::time_point until = previous_snapshot_time_ +
207  std::chrono::milliseconds(get_option().snapshot_interval_milliseconds_);
208  Epoch durable_epoch = engine_->get_log_manager()->get_durable_global_epoch();
209  Epoch previous_epoch = get_snapshot_epoch();
210  if (previous_epoch.is_valid() && previous_epoch == durable_epoch) {
211  LOG(INFO) << "Current snapshot is already latest. durable_epoch=" << durable_epoch;
213  && (!previous_epoch.is_valid()
214  || control_block_->get_requested_snapshot_epoch() > previous_epoch)) {
215  // if someone requested immediate snapshot, do it.
216  triggered = true;
217  LOG(INFO) << "Immediate snapshot request detected. snapshotting..";
218  } else if (std::chrono::system_clock::now() >= until) {
219  triggered = true;
220  LOG(INFO) << "Snapshot interval has elapsed. snapshotting..";
221  } else {
222  // TASK(Hideaki): check free pages in page pool and compare with configuration.
223  }
224 
225  if (triggered) {
226  Snapshot new_snapshot;
227  ErrorStack stack = handle_snapshot_triggered(&new_snapshot);
228  if (stack.is_error()) {
229  LOG(ERROR) << "Snapshot failed:" << stack;
230  }
231  } else {
232  VLOG(1) << "Snapshotting not triggered. going to sleep again";
233  }
234  }
235 
236  LOG(INFO) << "Snapshot daemon ended. ";
237 }
SnapshotManagerControlBlock * control_block_
const SnapshotOptions & get_option() const
shorthand for engine_->get_options().snapshot_.
ErrorStack handle_snapshot_triggered(Snapshot *new_snapshot)
handle_snapshot() calls this when it should start snapshotting.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
std::chrono::system_clock::time_point previous_snapshot_time_
When snapshot_thread_ took snapshot last time.
bool is_initialized() const override
Returns whether the engine is currently running.
Definition: engine.cpp:63
bool is_valid() const
Definition: epoch.hpp:96
Epoch get_durable_global_epoch() const
Returns the durable epoch of the entire engine.
Definition: log_manager.cpp:36
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::handle_snapshot_child ( )

Main routine for snapshot_thread_ in child engines.

All this does is to launch mappers/reducers threads when asked by master engine.

Definition at line 239 of file snapshot_manager_pimpl.cpp.

References foedus::soc::SharedPolling::acquire_ticket(), control_block_, foedus::snapshot::LogGleanerControlBlock::cur_snapshot_, engine_, foedus::Engine::get_soc_id(), foedus::snapshot::SnapshotManagerControlBlock::gleaner_, foedus::snapshot::Snapshot::id_, is_gleaning(), is_stop_requested(), foedus::snapshot::MapReduceBase::join_thread(), foedus::snapshot::MapReduceBase::launch_thread(), local_mappers_, local_reducer_, foedus::snapshot::SnapshotManagerControlBlock::snapshot_children_wakeup_, and foedus::soc::SharedPolling::timedwait().

Referenced by initialize_once().

239  {
240  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " started";
241  thread::NumaThreadScope scope(engine_->get_soc_id());
243  while (!is_stop_requested()) {
244  {
246  if (!is_stop_requested() && !is_gleaning()) {
248  }
249  }
250  if (is_stop_requested()) {
251  break;
252  } else if (!is_gleaning() || previous_id == control_block_->gleaner_.cur_snapshot_.id_) {
253  continue;
254  }
256  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " received a request"
257  << " for snapshot-" << current_id;
259  for (LogMapper* mapper : local_mappers_) {
260  mapper->launch_thread();
261  }
262  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " launched mappers/reducer"
263  " for snapshot-" << current_id;
264  for (LogMapper* mapper : local_mappers_) {
265  mapper->join_thread();
266  }
268  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " joined mappers/reducer"
269  " for snapshot-" << current_id;
270  previous_id = current_id;
271  }
272 
273  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " ended";
274 }
void launch_thread()
Start executing.
SnapshotManagerControlBlock * control_block_
uint64_t acquire_ticket() const
Gives the ticket to.
LogGleanerControlBlock gleaner_
Gleaner-related variables.
SnapshotId id_
Unique ID of this snapshot.
Definition: snapshot.hpp:43
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
LogReducer * local_reducer_
Reducer in this node.
std::vector< LogMapper * > local_mappers_
Mappers in this node.
Snapshot cur_snapshot_
The snapshot we are now taking.
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
soc::SharedPolling snapshot_children_wakeup_
Child snapshot managers (the ones in SOC engines) sleep on this condition until the master snapshot m...
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::handle_snapshot_triggered ( Snapshot new_snapshot)

handle_snapshot() calls this when it should start snapshotting.

In other words, this function is the main routine of snapshotting.

Definition at line 316 of file snapshot_manager_pimpl.cpp.

References ASSERT_ND, foedus::snapshot::Snapshot::base_epoch_, CHECK_ERROR, control_block_, foedus::snapshot::LogGleanerControlBlock::cur_snapshot_, drop_volatile_pages(), engine_, foedus::log::LogManager::get_durable_global_epoch(), foedus::storage::StorageManager::get_largest_storage_id(), foedus::Engine::get_log_manager(), foedus::snapshot::SnapshotManagerControlBlock::get_requested_snapshot_epoch(), get_snapshot_epoch(), foedus::Engine::get_storage_manager(), glean_logs(), foedus::snapshot::SnapshotManagerControlBlock::gleaner_, foedus::snapshot::Snapshot::id_, foedus::snapshot::increment(), foedus::storage::StorageManager::is_initialized(), foedus::Engine::is_master(), foedus::Epoch::is_valid(), foedus::snapshot::kNullSnapshotId, foedus::kRetOk, foedus::snapshot::Snapshot::max_storage_id_, foedus::assorted::memory_fence_release(), foedus::snapshot::SnapshotManagerControlBlock::previous_snapshot_id_, previous_snapshot_time_, foedus::soc::SharedPolling::signal(), foedus::snapshot::SnapshotManagerControlBlock::snapshot_epoch_, snapshot_metadata(), snapshot_savepoint(), foedus::snapshot::SnapshotManagerControlBlock::snapshot_taken_, foedus::snapshot::Snapshot::valid_until_epoch_, and foedus::Epoch::value().

Referenced by handle_snapshot(), and foedus::restart::RestartManagerPimpl::recover().

316  {
318  ASSERT_ND(engine_->get_storage_manager()->is_initialized()); // snapshot relied on storage module
319  Epoch durable_epoch = engine_->get_log_manager()->get_durable_global_epoch();
320  Epoch previous_epoch = get_snapshot_epoch();
321  LOG(INFO) << "Taking a new snapshot. durable_epoch=" << durable_epoch
322  << ", requested_snapshot_epoch=" << control_block_->get_requested_snapshot_epoch()
323  << ". previous_snapshot=" << previous_epoch;
324  ASSERT_ND(durable_epoch.is_valid() &&
325  (!previous_epoch.is_valid() || durable_epoch > previous_epoch));
326  new_snapshot->base_epoch_ = previous_epoch;
327  Epoch requested_epoch = control_block_->get_requested_snapshot_epoch();
328  if (requested_epoch.is_valid()) {
329  ASSERT_ND(requested_epoch <= durable_epoch);
330  ASSERT_ND(!previous_epoch.is_valid() || requested_epoch > previous_epoch);
331  new_snapshot->valid_until_epoch_ = requested_epoch;
332  } else {
333  new_snapshot->valid_until_epoch_ = durable_epoch;
334  }
335  new_snapshot->max_storage_id_ = engine_->get_storage_manager()->get_largest_storage_id();
336  ASSERT_ND(new_snapshot->max_storage_id_
338 
339  // determine the snapshot ID
340  SnapshotId snapshot_id;
342  snapshot_id = 1;
343  } else {
345  }
346  LOG(INFO) << "Issued ID for this snapshot:" << snapshot_id;
347  new_snapshot->id_ = snapshot_id;
348 
349  // okay, let's start the snapshotting.
350  // The procedures below will take long time, so we keep checking our "is_stop_requested"
351  // and stops our child threads when it happens.
352 
353  // For each storage that was modified in this snapshotting,
354  // this holds the pointer to new root page.
355  std::map<storage::StorageId, storage::SnapshotPagePointer> new_root_page_pointers;
356 
357  // Log gleaners design partitioning and do scatter-gather to consume the logs.
358  // This will create snapshot files at each partition and tell us the new root pages of
359  // each storage.
360  CHECK_ERROR(glean_logs(*new_snapshot, &new_root_page_pointers));
361 
362  // Write out the metadata file.
363  CHECK_ERROR(snapshot_metadata(*new_snapshot, new_root_page_pointers));
364 
365  // Invokes savepoint module to make sure this snapshot has "happened".
366  CHECK_ERROR(snapshot_savepoint(*new_snapshot));
367 
368  // install pointers to snapshot pages and drop volatile pages.
369  CHECK_ERROR(drop_volatile_pages(*new_snapshot, new_root_page_pointers));
370 
371  Epoch new_snapshot_epoch = new_snapshot->valid_until_epoch_;
372  ASSERT_ND(new_snapshot_epoch.is_valid() &&
373  (!get_snapshot_epoch().is_valid() || new_snapshot_epoch > get_snapshot_epoch()));
374 
375  // done. notify waiters if exist
376  Epoch::EpochInteger epoch_after = new_snapshot_epoch.value();
377  control_block_->previous_snapshot_id_ = snapshot_id;
378  previous_snapshot_time_ = std::chrono::system_clock::now();
379 
380  control_block_->snapshot_epoch_ = epoch_after;
383  return kRetOk;
384 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
SnapshotManagerControlBlock * control_block_
uint32_t EpochInteger
Unsigned integer representation of epoch.
Definition: epoch.hpp:64
ErrorStack snapshot_savepoint(const Snapshot &new_snapshot)
Sub-routine of handle_snapshot_triggered().
std::atomic< SnapshotId > previous_snapshot_id_
ID of previously completed snapshot.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
LogGleanerControlBlock gleaner_
Gleaner-related variables.
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
Definition: snapshot.hpp:58
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
bool is_initialized() const override
Returns whether the object has been already initialized or not.
ErrorStack snapshot_metadata(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
SnapshotId increment(SnapshotId id)
Increment SnapshotId.
Definition: snapshot_id.hpp:52
std::chrono::system_clock::time_point previous_snapshot_time_
When snapshot_thread_ took snapshot last time.
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
StorageId get_largest_storage_id()
Returns the largest StorageId that does or did exist.
bool is_valid() const
Definition: epoch.hpp:96
Epoch get_durable_global_epoch() const
Returns the durable epoch of the entire engine.
Definition: log_manager.cpp:36
Snapshot cur_snapshot_
The snapshot we are now taking.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
ErrorStack drop_volatile_pages(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
const ErrorStack kRetOk
Normal return value for no-error case.
soc::SharedPolling snapshot_taken_
Fired (notify_all) whenever snapshotting is completed.
ErrorStack glean_logs(const Snapshot &new_snapshot, std::map< storage::StorageId, storage::SnapshotPagePointer > *new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
std::atomic< Epoch::EpochInteger > snapshot_epoch_
The most recently snapshot-ed epoch, all logs upto this epoch is safe to delete.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
void signal()
Signal it to let waiters exit.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::initialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 61 of file snapshot_manager_pimpl.cpp.

References foedus::snapshot::LogGleanerControlBlock::all_count_, foedus::snapshot::LogGleanerResource::allocate(), ASSERT_ND, CHECK_ERROR, control_block_, foedus::storage::PartitionerMetadata::data_size_, engine_, ERROR_STACK, foedus::soc::SharedMemoryRepo::get_global_memory_anchors(), foedus::savepoint::SavepointManager::get_latest_snapshot_epoch(), foedus::savepoint::SavepointManager::get_latest_snapshot_id(), foedus::Engine::get_log_manager(), foedus::Engine::get_options(), foedus::Engine::get_savepoint_manager(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_count(), foedus::Engine::get_soc_manager(), foedus::snapshot::SnapshotManagerControlBlock::gleaner_, gleaner_resource_, foedus::thread::ThreadOptions::group_count_, handle_snapshot(), handle_snapshot_child(), foedus::snapshot::SnapshotManagerControlBlock::initialize(), foedus::DefaultInitializable::initialize(), foedus::storage::PartitionerMetadata::initialize(), foedus::log::LogManager::is_initialized(), foedus::soc::SharedMutex::is_initialized(), foedus::Engine::is_master(), foedus::Epoch::kEpochInvalid, foedus::kErrorCodeDepedentModuleUnavailableInit, foedus::kRetOk, local_mappers_, local_reducer_, foedus::EngineOptions::log_, foedus::log::LogOptions::loggers_per_node_, foedus::snapshot::LogGleanerControlBlock::mappers_count_, foedus::storage::StorageOptions::max_storages_, foedus::storage::PartitionerMetadata::mutex_, foedus::storage::StorageOptions::partitioner_data_memory_mb_, foedus::soc::GlobalMemoryAnchors::partitioner_metadata_, foedus::snapshot::SnapshotManagerControlBlock::previous_snapshot_id_, previous_snapshot_time_, foedus::snapshot::LogGleanerControlBlock::reducers_count_, foedus::snapshot::SnapshotManagerControlBlock::requested_snapshot_epoch_, foedus::snapshot::SnapshotManagerControlBlock::snapshot_epoch_, foedus::soc::GlobalMemoryAnchors::snapshot_manager_memory_, snapshot_thread_, stop_requested_, foedus::EngineOptions::storage_, foedus::EngineOptions::thread_, and foedus::Epoch::value().

61  {
62  LOG(INFO) << "Initializing SnapshotManager..";
65  }
66  soc::SharedMemoryRepo* repo = engine_->get_soc_manager()->get_shared_memory_repo();
67  control_block_ = repo->get_global_memory_anchors()->snapshot_manager_memory_;
68  if (engine_->is_master()) {
70  // get snapshot status from savepoint
75  LOG(INFO) << "Latest snapshot: id=" << control_block_->previous_snapshot_id_ << ", epoch="
78 
79  const EngineOptions& options = engine_->get_options();
80  uint32_t reducer_count = options.thread_.group_count_;
81  uint32_t mapper_count = reducer_count * options.log_.loggers_per_node_;
82  control_block_->gleaner_.reducers_count_ = reducer_count;
83  control_block_->gleaner_.mappers_count_ = mapper_count;
84  control_block_->gleaner_.all_count_ = reducer_count + mapper_count;
85 
86  // also initialize the shared memory for partitioner
87  uint32_t max_storages = engine_->get_options().storage_.max_storages_;
88  storage::PartitionerMetadata* partitioner_metadata
89  = repo->get_global_memory_anchors()->partitioner_metadata_;
90  for (storage::StorageId i = 0; i < max_storages; ++i) {
91  storage::PartitionerMetadata* meta = partitioner_metadata + i;
92  ASSERT_ND(!meta->mutex_.is_initialized());
93  meta->initialize();
94  ASSERT_ND(meta->mutex_.is_initialized());
95  }
96  // set the size of partitioner data
97  repo->get_global_memory_anchors()->partitioner_metadata_[0].data_size_
99 
100  // And master-only resources for running gleaner. These are NOT shared memory. local to master.
102  }
103 
104  // in child engines, we instantiate local mappers/reducer objects (but not the threads yet)
105  previous_snapshot_time_ = std::chrono::system_clock::now();
106  stop_requested_ = false;
107  if (!engine_->is_master()) {
108  local_reducer_ = new LogReducer(engine_);
110  for (uint16_t i = 0; i < engine_->get_options().log_.loggers_per_node_; ++i) {
111  local_mappers_.push_back(new LogMapper(engine_, i));
113  }
114  }
115 
116  // Launch the daemon thread at last
117  if (engine_->is_master()) {
118  snapshot_thread_ = std::move(std::thread(&SnapshotManagerPimpl::handle_snapshot, this));
119  } else {
120  snapshot_thread_ = std::move(std::thread(&SnapshotManagerPimpl::handle_snapshot_child, this));
121  }
122  return kRetOk;
123 }
void handle_snapshot()
Main routine for snapshot_thread_ in master engine.
snapshot::SnapshotId get_latest_snapshot_id() const
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
SnapshotManagerControlBlock * control_block_
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Definition: log_manager.cpp:32
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
Definition: error_code.hpp:109
uint16_t reducers_count_
Total number of mappers.
std::atomic< SnapshotId > previous_snapshot_id_
ID of previously completed snapshot.
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint32_t max_storages_
Maximum number of storages in this database.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
LogGleanerResource gleaner_resource_
Local resources for gleaner, which runs only in the master node.
Zero is always reserved for invalid epoch.
Definition: epoch.hpp:68
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
LogGleanerControlBlock gleaner_
Gleaner-related variables.
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
uint16_t mappers_count_
Total number of mappers.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
storage::StorageOptions storage_
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
uint16_t all_count_
mappers_count_ + reducers_count_.
std::chrono::system_clock::time_point previous_snapshot_time_
When snapshot_thread_ took snapshot last time.
std::atomic< Epoch::EpochInteger > requested_snapshot_epoch_
When a caller wants to immediately invoke snapshot, it calls trigger_snapshot_immediate(), which sets this value and then wakes up snapshot_thread_.
uint16_t group_count_
Number of ThreadGroup in the engine.
LogReducer * local_reducer_
Reducer in this node.
std::vector< LogMapper * > local_mappers_
Mappers in this node.
thread::ThreadOptions thread_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
std::atomic< Epoch::EpochInteger > snapshot_epoch_
The most recently snapshot-ed epoch, all logs upto this epoch is safe to delete.
void handle_snapshot_child()
Main routine for snapshot_thread_ in child engines.
std::thread snapshot_thread_
The daemon thread of snapshot manager.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
std::atomic< bool > stop_requested_
To locally shutdown snapshot_thread_.
uint16_t loggers_per_node_
Number of loggers per NUMA node.
Definition: log_options.hpp:80
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

bool foedus::snapshot::SnapshotManagerPimpl::is_gleaning ( ) const
inline

Definition at line 251 of file snapshot_manager_pimpl.hpp.

References control_block_, foedus::snapshot::SnapshotManagerControlBlock::gleaner_, and foedus::snapshot::LogGleanerControlBlock::gleaning_.

Referenced by handle_snapshot_child().

251 { return control_block_->gleaner_.gleaning_; }
std::atomic< bool > gleaning_
Whether the log gleaner is now running.
SnapshotManagerControlBlock * control_block_
LogGleanerControlBlock gleaner_
Gleaner-related variables.

Here is the caller graph for this function:

bool foedus::snapshot::SnapshotManagerPimpl::is_stop_requested ( ) const
inline

Definition at line 250 of file snapshot_manager_pimpl.hpp.

References stop_requested_.

Referenced by handle_snapshot(), handle_snapshot_child(), sleep_a_while(), and trigger_snapshot_immediate().

250 { return stop_requested_; }
std::atomic< bool > stop_requested_
To locally shutdown snapshot_thread_.

Here is the caller graph for this function:

SnapshotId foedus::snapshot::SnapshotManagerPimpl::issue_next_snapshot_id ( )
inline

Definition at line 239 of file snapshot_manager_pimpl.hpp.

References control_block_, foedus::snapshot::increment(), foedus::snapshot::kNullSnapshotId, and foedus::snapshot::SnapshotManagerControlBlock::previous_snapshot_id_.

239  {
242  } else {
244  }
246  }
SnapshotManagerControlBlock * control_block_
std::atomic< SnapshotId > previous_snapshot_id_
ID of previously completed snapshot.
SnapshotId increment(SnapshotId id)
Increment SnapshotId.
Definition: snapshot_id.hpp:52
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45

Here is the call graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::read_snapshot_metadata ( SnapshotId  snapshot_id,
SnapshotMetadata out 
)

Definition at line 453 of file snapshot_manager_pimpl.cpp.

References ASSERT_ND, CHECK_ERROR, foedus::debugging::StopWatch::elapsed_ms(), foedus::fs::file_size(), get_snapshot_metadata_file_path(), foedus::snapshot::SnapshotMetadata::id_, foedus::kRetOk, foedus::externalize::Externalizable::load_from_file(), and foedus::debugging::StopWatch::stop().

Referenced by foedus::snapshot::SnapshotManager::read_snapshot_metadata().

455  {
456  fs::Path file = get_snapshot_metadata_file_path(snapshot_id);
457  LOG(INFO) << "Reading snapshot metadata file fullpath=" << file;
458 
459  debugging::StopWatch stop_watch;
460  CHECK_ERROR(out->load_from_file(file));
461  stop_watch.stop();
462  LOG(INFO) << "Read a snapshot metadata file. size=" << fs::file_size(file) << " bytes"
463  << ", elapsed time to read+parse=" << stop_watch.elapsed_ms() << "ms.";
464 
465  ASSERT_ND(out->id_ == snapshot_id);
466  return kRetOk;
467 }
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
fs::Path get_snapshot_metadata_file_path(SnapshotId snapshot_id) const
each snapshot has a snapshot-metadata file "snapshot_metadata_.xml" in first node's firs...
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::sleep_a_while ( )

Definition at line 181 of file snapshot_manager_pimpl.cpp.

References foedus::soc::SharedPolling::acquire_ticket(), control_block_, is_stop_requested(), foedus::snapshot::SnapshotManagerControlBlock::snapshot_wakeup_, and foedus::soc::SharedPolling::timedwait().

Referenced by handle_snapshot().

181  {
182  uint64_t demand = control_block_->snapshot_wakeup_.acquire_ticket();
183  if (!is_stop_requested()) {
184  control_block_->snapshot_wakeup_.timedwait(demand, 20000ULL);
185  }
186 }
SnapshotManagerControlBlock * control_block_
uint64_t acquire_ticket() const
Gives the ticket to.
soc::SharedPolling snapshot_wakeup_
Snapshot thread sleeps on this condition variable.
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::snapshot_metadata ( const Snapshot new_snapshot,
const std::map< storage::StorageId, storage::SnapshotPagePointer > &  new_root_page_pointers 
)

Sub-routine of handle_snapshot_triggered().

Write out a snapshot metadata file that contains metadata of all storages and a few other global metadata.

Definition at line 401 of file snapshot_manager_pimpl.cpp.

References ASSERT_ND, foedus::snapshot::Snapshot::base_epoch_, foedus::snapshot::SnapshotMetadata::base_epoch_, CHECK_ERROR, foedus::storage::StorageManager::clone_all_storage_metadata(), foedus::fs::create_directories(), foedus::debugging::StopWatch::elapsed_ms(), engine_, ERROR_STACK, foedus::fs::exists(), foedus::fs::file_size(), foedus::fs::fsync(), foedus::snapshot::SnapshotMetadata::get_metadata(), get_option(), get_snapshot_metadata_file_path(), foedus::Engine::get_storage_manager(), foedus::snapshot::Snapshot::id_, foedus::snapshot::SnapshotMetadata::id_, foedus::storage::StorageControlBlock::is_valid_status(), foedus::kErrorCodeFsMkdirFailed, foedus::kRetOk, foedus::snapshot::SnapshotMetadata::largest_storage_id_, foedus::snapshot::Snapshot::max_storage_id_, foedus::externalize::Externalizable::save_to_file(), foedus::debugging::StopWatch::start(), foedus::debugging::StopWatch::stop(), foedus::snapshot::SnapshotMetadata::storage_control_blocks_, foedus::snapshot::Snapshot::valid_until_epoch_, foedus::snapshot::SnapshotMetadata::valid_until_epoch_, and foedus::Epoch::value().

Referenced by handle_snapshot_triggered().

403  {
404  // construct metadata object
405  SnapshotMetadata metadata;
406  metadata.id_ = new_snapshot.id_;
407  metadata.base_epoch_ = new_snapshot.base_epoch_.value();
408  metadata.valid_until_epoch_ = new_snapshot.valid_until_epoch_.value();
409  metadata.largest_storage_id_ = new_snapshot.max_storage_id_;
411 
412  // we modified the root page.
413  uint32_t installed_root_pages_count = 0;
414  for (storage::StorageId id = 1; id <= metadata.largest_storage_id_; ++id) {
415  const auto& it = new_root_page_pointers.find(id);
416  ASSERT_ND(metadata.storage_control_blocks_[id].is_valid_status());
417  storage::Metadata* meta = metadata.get_metadata(id);
418  if (it != new_root_page_pointers.end()) {
419  storage::SnapshotPagePointer new_pointer = it->second;
420  // composer's construct_root should have been already set the new root pointer
421  ASSERT_ND(new_pointer == meta->root_snapshot_page_id_);
422  ++installed_root_pages_count;
423  }
424  }
425  LOG(INFO) << "Out of " << metadata.largest_storage_id_ << " storages, "
426  << installed_root_pages_count << " changed their root pages.";
427  ASSERT_ND(installed_root_pages_count == new_root_page_pointers.size());
428 
429  // save it to a file
430  fs::Path folder(get_option().get_primary_folder_path());
431  if (!fs::exists(folder)) {
432  if (!fs::create_directories(folder, true)) {
433  LOG(ERROR) << "Failed to create directory:" << folder << ". check permission.";
435  }
436  }
437 
438  fs::Path file = get_snapshot_metadata_file_path(new_snapshot.id_);
439  LOG(INFO) << "New snapshot metadata file fullpath=" << file;
440 
441  debugging::StopWatch stop_watch;
442  CHECK_ERROR(metadata.save_to_file(file));
443  stop_watch.stop();
444  LOG(INFO) << "Wrote a snapshot metadata file. size=" << fs::file_size(file) << " bytes"
445  << ", elapsed time to write=" << stop_watch.elapsed_ms() << "ms. now fsyncing...";
446  stop_watch.start();
447  fs::fsync(file, true);
448  stop_watch.stop();
449  LOG(INFO) << "fsynced the file and the folder! elapsed=" << stop_watch.elapsed_ms() << "ms.";
450  return kRetOk;
451 }
0x020D : "FILESYS: Failed to create a directory" .
Definition: error_code.hpp:138
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
const SnapshotOptions & get_option() const
shorthand for engine_->get_options().snapshot_.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
ErrorStack clone_all_storage_metadata(snapshot::SnapshotMetadata *metadata)
This method is called during snapshotting to clone metadata of all existing storages to the given obj...
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
Definition: filesystem.cpp:89
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
fs::Path get_snapshot_metadata_file_path(SnapshotId snapshot_id) const
each snapshot has a snapshot-metadata file "snapshot_metadata_.xml" in first node's firs...
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
Definition: filesystem.cpp:203

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::snapshot_savepoint ( const Snapshot new_snapshot)

Sub-routine of handle_snapshot_triggered().

Invokes the savepoint module to take savepoint pointing to this snapshot. Until that, the snapshot is not yet deemed as "happened".

Definition at line 469 of file snapshot_manager_pimpl.cpp.

References ASSERT_ND, CHECK_ERROR, engine_, foedus::savepoint::SavepointManager::get_latest_snapshot_epoch(), foedus::savepoint::SavepointManager::get_latest_snapshot_id(), foedus::Engine::get_savepoint_manager(), foedus::snapshot::Snapshot::id_, foedus::kRetOk, foedus::savepoint::SavepointManager::take_savepoint_after_snapshot(), and foedus::snapshot::Snapshot::valid_until_epoch_.

Referenced by handle_snapshot_triggered().

469  {
470  LOG(INFO) << "Taking savepoint to include this new snapshot....";
472  new_snapshot.id_,
473  new_snapshot.valid_until_epoch_));
476  == new_snapshot.valid_until_epoch_);
477  return kRetOk;
478 }
ErrorStack take_savepoint_after_snapshot(snapshot::SnapshotId new_snapshot_id, Epoch new_snapshot_epoch)
Takes a savepoint just to remember the newly taken snapshot.
snapshot::SnapshotId get_latest_snapshot_id() const
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::stop_snapshot_thread ( )

This is a hidden API called at the beginning of engine shutdown (namely restart manager).

Snapshot Manager initializes before Storage because it must read previous snapshot, but it must stop snapshot thread before Storage because the snapshot thread relies on storage module. To solve the issue, we call this method from restart manager's uninit to stop snapshot thread. This method is also called by snapshot manager's own uninit() in case restart manager didn't initialize. Thus, this method must be idempotent.

Definition at line 164 of file snapshot_manager_pimpl.cpp.

References foedus::snapshot::LogGleanerControlBlock::cancelled_, control_block_, engine_, foedus::snapshot::SnapshotManagerControlBlock::gleaner_, foedus::Engine::is_master(), snapshot_thread_, stop_requested_, foedus::snapshot::LogGleanerControlBlock::terminating_, wakeup(), and foedus::snapshot::SnapshotManagerControlBlock::wakeup_snapshot_children().

Referenced by foedus::restart::RestartManagerPimpl::uninitialize_once(), and uninitialize_once().

164  {
165  LOG(INFO) << "Stopping the snapshot thread...";
166  if (snapshot_thread_.joinable()) {
167  // whether from master or not, just make sure all reducers/mappers notice that it's closing
168  stop_requested_ = true;
171  if (engine_->is_master()) {
172  wakeup();
173  } else {
175  }
176  snapshot_thread_.join();
177  }
178  LOG(INFO) << "Stopped the snapshot thread.";
179 }
SnapshotManagerControlBlock * control_block_
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
LogGleanerControlBlock gleaner_
Gleaner-related variables.
void wakeup_snapshot_children()
Fires snapshot_children_wakeup_.
std::atomic< bool > terminating_
Whether the engine is being terminated.
std::thread snapshot_thread_
The daemon thread of snapshot manager.
std::atomic< bool > cancelled_
Whether the log gleaner has been cancalled.
std::atomic< bool > stop_requested_
To locally shutdown snapshot_thread_.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::trigger_snapshot_immediate ( bool  wait_completion,
Epoch  suggested_snapshot_epoch 
)

Definition at line 277 of file snapshot_manager_pimpl.cpp.

References foedus::soc::SharedPolling::acquire_ticket(), ASSERT_ND, control_block_, engine_, foedus::log::LogManager::get_durable_global_epoch(), foedus::Engine::get_log_manager(), get_snapshot_epoch(), is_stop_requested(), foedus::Epoch::is_valid(), foedus::snapshot::SnapshotManagerControlBlock::requested_snapshot_epoch_, foedus::snapshot::SnapshotManagerControlBlock::snapshot_taken_, foedus::soc::SharedPolling::timedwait(), and wakeup().

Referenced by foedus::snapshot::SnapshotManager::trigger_snapshot_immediate().

279  {
280  LOG(INFO) << "Requesting to immediately take a snapshot. suggested_snapshot_epoch="
281  << suggested_snapshot_epoch;
282  Epoch before = get_snapshot_epoch();
283  Epoch durable_epoch = engine_->get_log_manager()->get_durable_global_epoch();
284  ASSERT_ND(durable_epoch.is_valid());
285 
286  Epoch new_snapshot_epoch = durable_epoch;
287  if (suggested_snapshot_epoch.is_valid()) {
288  if (suggested_snapshot_epoch > durable_epoch) {
289  LOG(WARNING) << "You can't specify non-durable epoch for snapshot.";
290  } else {
291  new_snapshot_epoch = suggested_snapshot_epoch;
292  }
293  }
294 
295  if (before.is_valid() && before >= durable_epoch) {
296  LOG(INFO) << "Current snapshot already satisfies. new_snapshot_epoch=" << new_snapshot_epoch;
297  return;
298  }
299 
300  control_block_->requested_snapshot_epoch_.store(new_snapshot_epoch.value());
301  wakeup();
302  if (wait_completion) {
303  VLOG(0) << "Waiting for the completion of snapshot... before=" << before;
304  while (!is_stop_requested() &&
305  (!get_snapshot_epoch().is_valid() || new_snapshot_epoch > get_snapshot_epoch())) {
306  uint64_t demand = control_block_->snapshot_taken_.acquire_ticket();
307  if (!is_stop_requested() &&
308  (!get_snapshot_epoch().is_valid() || new_snapshot_epoch > get_snapshot_epoch())) {
309  control_block_->snapshot_taken_.timedwait(demand, 100000ULL);
310  }
311  }
312  }
313  LOG(INFO) << "Observed the completion of snapshot! after=" << get_snapshot_epoch();
314 }
SnapshotManagerControlBlock * control_block_
uint64_t acquire_ticket() const
Gives the ticket to.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
std::atomic< Epoch::EpochInteger > requested_snapshot_epoch_
When a caller wants to immediately invoke snapshot, it calls trigger_snapshot_immediate(), which sets this value and then wakes up snapshot_thread_.
Epoch get_durable_global_epoch() const
Returns the durable epoch of the entire engine.
Definition: log_manager.cpp:36
soc::SharedPolling snapshot_taken_
Fired (notify_all) whenever snapshotting is completed.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::snapshot::SnapshotManagerPimpl::uninitialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 125 of file snapshot_manager_pimpl.cpp.

References ASSERT_ND, control_block_, foedus::snapshot::LogGleanerResource::deallocate(), foedus::ErrorStackBatch::emprace_back(), engine_, ERROR_STACK, foedus::soc::SharedMemoryRepo::get_global_memory_anchors(), foedus::Engine::get_log_manager(), foedus::Engine::get_options(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), gleaner_resource_, foedus::log::LogManager::is_initialized(), foedus::soc::SharedMutex::is_initialized(), foedus::Engine::is_master(), foedus::kErrorCodeDepedentModuleUnavailableUninit, local_mappers_, local_reducer_, foedus::storage::StorageOptions::max_storages_, foedus::storage::PartitionerMetadata::mutex_, foedus::soc::GlobalMemoryAnchors::partitioner_metadata_, foedus::snapshot::LogGleanerResource::per_node_resources_, stop_snapshot_thread(), foedus::EngineOptions::storage_, SUMMARIZE_ERROR_BATCH, foedus::snapshot::SnapshotManagerControlBlock::uninitialize(), foedus::DefaultInitializable::uninitialize(), and foedus::storage::PartitionerMetadata::uninitialize().

125  {
126  LOG(INFO) << "Uninitializing SnapshotManager..";
127  ErrorStackBatch batch;
130  }
132  if (engine_->is_master()) {
133  // also uninitialize the shared memory for partitioner
134  soc::GlobalMemoryAnchors* anchors
136  uint32_t max_storages = engine_->get_options().storage_.max_storages_;
137  for (storage::StorageId i = 0; i < max_storages; ++i) {
138  ASSERT_ND(anchors->partitioner_metadata_[i].mutex_.is_initialized());
139  anchors->partitioner_metadata_[i].uninitialize();
140  ASSERT_ND(!anchors->partitioner_metadata_[i].mutex_.is_initialized());
141  }
142 
144  ASSERT_ND(local_reducer_ == nullptr);
145  ASSERT_ND(local_mappers_.size() == 0);
146 
148  } else {
149  if (local_reducer_) {
150  batch.emprace_back(local_reducer_->uninitialize());
151  delete local_reducer_;
152  local_reducer_ = nullptr;
153  }
154  for (LogMapper* mapper : local_mappers_) {
155  batch.emprace_back(mapper->uninitialize());
156  delete mapper;
157  }
158  local_mappers_.clear();
160  }
161 
162  return SUMMARIZE_ERROR_BATCH(batch);
163 }
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
SnapshotManagerControlBlock * control_block_
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
GlobalMemoryAnchors * get_global_memory_anchors()
std::vector< PerNodeResource > per_node_resources_
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Definition: log_manager.cpp:32
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint32_t max_storages_
Maximum number of storages in this database.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
LogGleanerResource gleaner_resource_
Local resources for gleaner, which runs only in the master node.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
storage::StorageOptions storage_
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
LogReducer * local_reducer_
Reducer in this node.
std::vector< LogMapper * > local_mappers_
Mappers in this node.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
void stop_snapshot_thread()
This is a hidden API called at the beginning of engine shutdown (namely restart manager).
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
Definition: error_code.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

void foedus::snapshot::SnapshotManagerPimpl::wakeup ( )

Definition at line 187 of file snapshot_manager_pimpl.cpp.

References control_block_, foedus::soc::SharedPolling::signal(), and foedus::snapshot::SnapshotManagerControlBlock::snapshot_wakeup_.

Referenced by stop_snapshot_thread(), and trigger_snapshot_immediate().

187  {
189 }
SnapshotManagerControlBlock * control_block_
soc::SharedPolling snapshot_wakeup_
Snapshot thread sleeps on this condition variable.
void signal()
Signal it to let waiters exit.

Here is the call graph for this function:

Here is the caller graph for this function:

Member Data Documentation

LogGleanerResource foedus::snapshot::SnapshotManagerPimpl::gleaner_resource_

Local resources for gleaner, which runs only in the master node.

Empty in child nodes.

Definition at line 356 of file snapshot_manager_pimpl.hpp.

Referenced by glean_logs(), initialize_once(), and uninitialize_once().

std::vector<LogMapper*> foedus::snapshot::SnapshotManagerPimpl::local_mappers_

Mappers in this node.

Index is logger ordinal. Empty in master engine.

Definition at line 351 of file snapshot_manager_pimpl.hpp.

Referenced by handle_snapshot_child(), initialize_once(), and uninitialize_once().

LogReducer* foedus::snapshot::SnapshotManagerPimpl::local_reducer_

Reducer in this node.

Null in master engine.

Definition at line 353 of file snapshot_manager_pimpl.hpp.

Referenced by handle_snapshot_child(), initialize_once(), and uninitialize_once().

std::chrono::system_clock::time_point foedus::snapshot::SnapshotManagerPimpl::previous_snapshot_time_

When snapshot_thread_ took snapshot last time.

Read and written only by snapshot_thread_.

Definition at line 348 of file snapshot_manager_pimpl.hpp.

Referenced by handle_snapshot(), handle_snapshot_triggered(), and initialize_once().

std::thread foedus::snapshot::SnapshotManagerPimpl::snapshot_thread_

The daemon thread of snapshot manager.

In master engine, this occasionally wakes up and serves as the main managing thread for snapshotting, which consists of several child threads and multiple phases. In child engine, this receives requests from master engine's snapshot_thread_ and launch mappers/reducers in this node.

Definition at line 342 of file snapshot_manager_pimpl.hpp.

Referenced by initialize_once(), and stop_snapshot_thread().

std::vector< Snapshot > foedus::snapshot::SnapshotManagerPimpl::snapshots_

All previously taken snapshots.

Access to this data must be protected with mutex. This is populated only in master engine.

Definition at line 330 of file snapshot_manager_pimpl.hpp.

std::atomic<bool> foedus::snapshot::SnapshotManagerPimpl::stop_requested_

To locally shutdown snapshot_thread_.

This is not a shared memory.

Definition at line 333 of file snapshot_manager_pimpl.hpp.

Referenced by initialize_once(), is_stop_requested(), and stop_snapshot_thread().


The documentation for this class was generated from the following files: