20 #include <glog/logging.h>
62 LOG(INFO) <<
"Initializing SnapshotManager..";
126 LOG(INFO) <<
"Uninitializing SnapshotManager..";
158 local_mappers_.clear();
165 LOG(INFO) <<
"Stopping the snapshot thread...";
178 LOG(INFO) <<
"Stopped the snapshot thread.";
192 LOG(INFO) <<
"Snapshot daemon started";
198 LOG(INFO) <<
"Snapshot daemon now starts taking snapshot";
205 bool triggered =
false;
207 std::chrono::milliseconds(
get_option().snapshot_interval_milliseconds_);
210 if (previous_epoch.
is_valid() && previous_epoch == durable_epoch) {
211 LOG(INFO) <<
"Current snapshot is already latest. durable_epoch=" << durable_epoch;
217 LOG(INFO) <<
"Immediate snapshot request detected. snapshotting..";
218 }
else if (std::chrono::system_clock::now() >= until) {
220 LOG(INFO) <<
"Snapshot interval has elapsed. snapshotting..";
229 LOG(ERROR) <<
"Snapshot failed:" << stack;
232 VLOG(1) <<
"Snapshotting not triggered. going to sleep again";
236 LOG(INFO) <<
"Snapshot daemon ended. ";
256 LOG(INFO) <<
"Child snapshot daemon-" <<
engine_->
get_soc_id() <<
" received a request"
257 <<
" for snapshot-" << current_id;
260 mapper->launch_thread();
262 LOG(INFO) <<
"Child snapshot daemon-" <<
engine_->
get_soc_id() <<
" launched mappers/reducer"
263 " for snapshot-" << current_id;
264 for (
LogMapper* mapper : local_mappers_) {
265 mapper->join_thread();
268 LOG(INFO) <<
"Child snapshot daemon-" <<
engine_->
get_soc_id() <<
" joined mappers/reducer"
269 " for snapshot-" << current_id;
270 previous_id = current_id;
278 bool wait_completion,
279 Epoch suggested_snapshot_epoch) {
280 LOG(INFO) <<
"Requesting to immediately take a snapshot. suggested_snapshot_epoch="
281 << suggested_snapshot_epoch;
286 Epoch new_snapshot_epoch = durable_epoch;
287 if (suggested_snapshot_epoch.
is_valid()) {
288 if (suggested_snapshot_epoch > durable_epoch) {
289 LOG(WARNING) <<
"You can't specify non-durable epoch for snapshot.";
291 new_snapshot_epoch = suggested_snapshot_epoch;
295 if (before.
is_valid() && before >= durable_epoch) {
296 LOG(INFO) <<
"Current snapshot already satisfies. new_snapshot_epoch=" << new_snapshot_epoch;
302 if (wait_completion) {
303 VLOG(0) <<
"Waiting for the completion of snapshot... before=" << before;
321 LOG(INFO) <<
"Taking a new snapshot. durable_epoch=" << durable_epoch
323 <<
". previous_snapshot=" << previous_epoch;
325 (!previous_epoch.
is_valid() || durable_epoch > previous_epoch));
329 ASSERT_ND(requested_epoch <= durable_epoch);
346 LOG(INFO) <<
"Issued ID for this snapshot:" << snapshot_id;
347 new_snapshot->
id_ = snapshot_id;
355 std::map<storage::StorageId, storage::SnapshotPagePointer> new_root_page_pointers;
388 std::map<storage::StorageId, storage::SnapshotPagePointer>* new_root_page_pointers) {
394 LOG(ERROR) <<
"Log Gleaner encountered either an error or early termination request";
403 const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers) {
406 metadata.
id_ = new_snapshot.
id_;
413 uint32_t installed_root_pages_count = 0;
415 const auto& it = new_root_page_pointers.find(
id);
418 if (it != new_root_page_pointers.end()) {
421 ASSERT_ND(new_pointer == meta->root_snapshot_page_id_);
422 ++installed_root_pages_count;
426 << installed_root_pages_count <<
" changed their root pages.";
427 ASSERT_ND(installed_root_pages_count == new_root_page_pointers.size());
433 LOG(ERROR) <<
"Failed to create directory:" << folder <<
". check permission.";
439 LOG(INFO) <<
"New snapshot metadata file fullpath=" << file;
444 LOG(INFO) <<
"Wrote a snapshot metadata file. size=" <<
fs::file_size(file) <<
" bytes"
445 <<
", elapsed time to write=" << stop_watch.
elapsed_ms() <<
"ms. now fsyncing...";
449 LOG(INFO) <<
"fsynced the file and the folder! elapsed=" << stop_watch.
elapsed_ms() <<
"ms.";
457 LOG(INFO) <<
"Reading snapshot metadata file fullpath=" << file;
462 LOG(INFO) <<
"Read a snapshot metadata file. size=" <<
fs::file_size(file) <<
" bytes"
463 <<
", elapsed time to read+parse=" << stop_watch.
elapsed_ms() <<
"ms.";
470 LOG(INFO) <<
"Taking savepoint to include this new snapshot....";
483 file /= std::string(
"snapshot_metadata_")
484 + std::to_string(snapshot_id) + std::string(
".xml");
490 const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers) {
492 LOG(INFO) <<
"Dropping volatile pointers...";
519 std::this_thread::sleep_for(std::chrono::milliseconds(100));
520 LOG(INFO) <<
"Paused transaction executions to safely drop volatile pages and waited enough"
521 <<
" to let currently running xcts end. Now start replace pointers.";
524 std::vector< std::thread > threads;
525 for (uint16_t node = 0; node < soc_count; ++node) {
526 threads.emplace_back(
530 new_root_page_pointers,
535 for (std::thread& thr : threads) {
539 LOG(INFO) <<
"Joined child threads. Now consider dropping root pages";
548 for (uint16_t node = 0; node < soc_count; ++node) {
549 dropped_chunks[node].
clear();
554 VLOG(1) <<
"Considering to drop root page of storage-" <<
id <<
" ...";
555 bool cannot_drop =
false;
556 for (uint16_t node = 0; node < soc_count; ++node) {
568 LOG(INFO) <<
"Looks like we can drop ALL volatile pages of storage-" <<
id <<
"!!!";
571 uint64_t dropped_count = 0;
580 LOG(INFO) <<
"As a result, we dropped " << dropped_count <<
" pages from storage-" << id;
587 LOG(INFO) <<
"Total: Dropped volatile pages in " << stop_watch.
elapsed_ms() <<
"ms.";
597 const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers,
599 uint16_t parallel_id) {
617 for (uint16_t node = 0; node < soc_count; ++node) {
618 dropped_chunks[node].
clear();
621 LOG(INFO) <<
"Thread-" << parallel_id <<
" started dropping volatile pages.";
623 uint64_t dropped_count_total = 0;
626 const auto& it = new_root_page_pointers.find(
id);
627 if (it != new_root_page_pointers.end()) {
628 VLOG(0) <<
"Dropping pointers for storage-" <<
id <<
" ...";
632 uint64_t dropped_count = 0;
642 snapshot_pointer_ == new_root_page_pointer);
644 == new_root_page_pointer);
645 dropped_count_total += dropped_count;
647 LOG(INFO) <<
"Thread-" << parallel_id <<
" drop_volatiles for storage-" <<
id
649 <<
" took " << watch.
elapsed_sec() <<
"s. dropped_count=" << dropped_count
650 <<
". result =" << result;
651 results[soc_count *
id + parallel_id] = result;
653 VLOG(0) <<
"Thread-" << parallel_id <<
" storage-"
654 <<
id <<
" wasn't changed no drop pointers";
657 results[soc_count *
id + parallel_id].
dropped_all_ =
false;
662 LOG(INFO) <<
"Thread-" << parallel_id <<
" dropped " << dropped_count_total
663 <<
" volatile pointers in " << stop_watch.
elapsed_ms() <<
"ms.";
665 for (uint16_t node = 0; node < soc_count; ++node) {
669 if (!chunk->
empty()) {
0x020D : "FILESYS: Failed to create a directory" .
Metadata meta_
common part of the metadata.
void start()
Take current time tick.
Epoch max_observed_
the largest Epoch it observed recursively.
void handle_snapshot()
Main routine for snapshot_thread_ in master engine.
ErrorStack read_snapshot_metadata(SnapshotId snapshot_id, SnapshotMetadata *out)
ErrorStack take_savepoint_after_snapshot(snapshot::SnapshotId new_snapshot_id, Epoch new_snapshot_epoch)
Takes a savepoint just to remember the newly taken snapshot.
numa_alloc_onnode() and numa_free().
snapshot::SnapshotId get_latest_snapshot_id() const
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
storage::StorageManager * get_storage_manager() const
See Storage Manager.
void drop_volatile_pages_parallel(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers, void *result_memory, uint16_t parallel_id)
subroutine invoked by one thread for one node.
void launch_thread()
Start executing.
Epoch get_requested_snapshot_epoch() const
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
DropResult drop_volatiles(const DropVolatilesArguments &args)
Drops volatile pages that have not been modified since the snapshotted epoch.
Represents a logic to compose a new version of data pages for one storage.
SnapshotManagerControlBlock * control_block_
void release_block()
Releases the memory block.
const SnapshotOptions & get_option() const
shorthand for engine_->get_options().snapshot_.
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
ErrorStack handle_snapshot_triggered(Snapshot *new_snapshot)
handle_snapshot() calls this when it should start snapshotting.
GlobalMemoryAnchors * get_global_memory_anchors()
A log-gleaner, which constructs a new set of snapshot files during snapshotting.
double elapsed_ms() const
uint32_t EpochInteger
Unsigned integer representation of epoch.
std::vector< PerNodeResource > per_node_resources_
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
void resume_accepting_xct()
Make sure you call this after pause_accepting_xct().
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
void allocate(uint16_t node_count)
Brings error stacktrace information as return value of functions.
ErrorStack save_to_file(const fs::Path &path) const
Atomically and durably writes out this object to the specified XML file.
DualPagePointer root_page_pointer_
Points to the root page (or something equivalent).
ErrorStack initialize_once() override
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
uint16_t reducers_count_
Total number of mappers.
void pause_accepting_xct()
Pause all begin_xct until you call resume_accepting_xct()
Epoch get_latest_snapshot_epoch() const
ErrorStack clone_all_storage_metadata(snapshot::SnapshotMetadata *metadata)
This method is called during snapshotting to clone metadata of all existing storages to the given obj...
Pin the current thread to the given NUMA node in this object's scope.
ErrorStack snapshot_savepoint(const Snapshot &new_snapshot)
Sub-routine of handle_snapshot_triggered().
std::atomic< SnapshotId > previous_snapshot_id_
ID of previously completed snapshot.
A log mapper, which reads log files from one logger and sends them to corresponding log reducers...
const EngineOptions & get_options() const
uint32_t max_storages_
Maximum number of storages in this database.
bool is_master() const
Returns if this engine object is a master instance.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
LogGleanerResource gleaner_resource_
Local resources for gleaner, which runs only in the master node.
Zero is always reserved for invalid epoch.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
uint64_t acquire_ticket() const
Gives the ticket to.
LogGleanerControlBlock gleaner_
Gleaner-related variables.
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
uint16_t mappers_count_
Total number of mappers.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
Analogue of boost::filesystem::path.
storage::PartitionerMetadata * partitioner_metadata_
Tiny metadata memory for partitioners.
log::LogManager * get_log_manager() const
See Log Manager.
storage::StorageOptions storage_
ErrorStack uninitialize_once() override
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
snapshot::SnapshotOptions snapshot_
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
SnapshotId id_
Unique ID of this snapshot.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
bool is_initialized() const override
Returns whether the object has been already initialized or not.
uint16_t all_count_
mappers_count_ + reducers_count_.
uint64_t stop()
Take another current time tick.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
void wakeup_snapshot_children()
Fires snapshot_children_wakeup_.
soc::SharedPolling snapshot_wakeup_
Snapshot thread sleeps on this condition variable.
ErrorStack snapshot_metadata(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
Epoch get_snapshot_epoch() const
SnapshotId increment(SnapshotId id)
Increment SnapshotId.
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
bool exists(const Path &p)
Returns if the file exists.
Set of option values given to the engine at start-up.
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
Retrun value of drop_volatiles()
std::chrono::system_clock::time_point previous_snapshot_time_
When snapshot_thread_ took snapshot last time.
std::atomic< Epoch::EpochInteger > requested_snapshot_epoch_
When a caller wants to immediately invoke snapshot, it calls trigger_snapshot_immediate(), which sets this value and then wakes up snapshot_thread_.
bool dropped_all_
Whether all volatile pages under the page was dropped.
void * get_block() const
Returns the memory block.
Set of options for snapshot manager.
uint16_t group_count_
Number of ThreadGroup in the engine.
uint64_t file_size(const Path &p)
Returns size of the file.
uint16_t SnapshotId
Unique ID of Snapshot.
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
fs::Path get_snapshot_metadata_file_path(SnapshotId snapshot_id) const
each snapshot has a snapshot-metadata file "snapshot_metadata_.xml" in first node's firs...
Repository of all shared memory in one FOEDUS instance.
ErrorStack execute()
Main routine of log gleaner.
Just a set of pointers within global_memory_ for ease of use.
snapshot::SnapshotManagerControlBlock * snapshot_manager_memory_
Tiny memory for snapshot manager.
bool is_initialized() const override
Returns whether the engine is currently running.
const SnapshotId kNullSnapshotId
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
LogReducer * local_reducer_
Reducer in this node.
xct::XctManager * get_xct_manager() const
See Transaction Manager.
StorageId get_largest_storage_id()
Returns the largest StorageId that does or did exist.
std::vector< LogMapper * > local_mappers_
Mappers in this node.
Epoch get_durable_global_epoch() const
Returns the durable epoch of the entire engine.
Snapshot cur_snapshot_
The snapshot we are now taking.
bool is_valid_status() const
thread::ThreadOptions thread_
To reduce the overhead of grabbing/releasing pages from pool, we pack this many pointers for each gra...
Epoch base_epoch_
This snapshot was taken on top of previous snapshot that is valid_until this epoch.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
ErrorStack drop_volatile_pages(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
Represents one memory block aligned to actual OS/hardware pages.
bool is_stop_requested() const
ErrorStack load_from_file(const fs::Path &path)
Load the content of this object from the specified XML file.
std::atomic< bool > terminating_
Whether the engine is being terminated.
const ErrorStack kRetOk
Normal return value for no-error case.
double elapsed_sec() const
soc::SocManager * get_soc_manager() const
See SOC and IPC.
void stop_snapshot_thread()
This is a hidden API called at the beginning of engine shutdown (namely restart manager).
PagePool * get_volatile_pool()
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
soc::SharedPolling snapshot_taken_
Fired (notify_all) whenever snapshotting is completed.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
ErrorStack glean_logs(const Snapshot &new_snapshot, std::map< storage::StorageId, storage::SnapshotPagePointer > *new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
std::atomic< Epoch::EpochInteger > snapshot_epoch_
The most recently snapshot-ed epoch, all logs upto this epoch is safe to delete.
void handle_snapshot_child()
Main routine for snapshot_thread_ in child engines.
std::thread snapshot_thread_
The daemon thread of snapshot manager.
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
void drop_root_volatile(const DropVolatilesArguments &args)
This is additionally called when no partitions observed any new modifications.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
soc::SharedPolling snapshot_children_wakeup_
Child snapshot managers (the ones in SOC engines) sleep on this condition until the master snapshot m...
A high-resolution stop watch.
StorageControlBlock * get_storage(StorageId id)
Returns the storage of given ID.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
bool is_initialized() const
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
void trigger_snapshot_immediate(bool wait_completion, Epoch suggested_snapshot_epoch)
const std::map< storage::StorageId, storage::SnapshotPagePointer > & get_new_root_page_pointers() const
Returns pointers to new root pages constructed at the end of gleaning.
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
std::atomic< bool > cancelled_
Whether the log gleaner has been cancalled.
std::atomic< bool > stop_requested_
To locally shutdown snapshot_thread_.
void signal()
Signal it to let waiters exit.
bool is_error() const
Returns if this return code is not kErrorCodeOk.
Arguments for drop_volatiles()
uint16_t loggers_per_node_
Number of loggers per NUMA node.
EpochInteger value() const
Returns the raw integer representation.
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.