libfoedus-core
FOEDUS Core Library
snapshot_manager_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <chrono>
23 #include <map>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include "foedus/engine.hpp"
33 #include "foedus/fs/filesystem.hpp"
34 #include "foedus/fs/path.hpp"
54 
55 namespace foedus {
56 namespace snapshot {
58  return engine_->get_options().snapshot_;
59 }
60 
62  LOG(INFO) << "Initializing SnapshotManager..";
65  }
68  if (engine_->is_master()) {
70  // get snapshot status from savepoint
75  LOG(INFO) << "Latest snapshot: id=" << control_block_->previous_snapshot_id_ << ", epoch="
78 
79  const EngineOptions& options = engine_->get_options();
80  uint32_t reducer_count = options.thread_.group_count_;
81  uint32_t mapper_count = reducer_count * options.log_.loggers_per_node_;
82  control_block_->gleaner_.reducers_count_ = reducer_count;
83  control_block_->gleaner_.mappers_count_ = mapper_count;
84  control_block_->gleaner_.all_count_ = reducer_count + mapper_count;
85 
86  // also initialize the shared memory for partitioner
87  uint32_t max_storages = engine_->get_options().storage_.max_storages_;
88  storage::PartitionerMetadata* partitioner_metadata
90  for (storage::StorageId i = 0; i < max_storages; ++i) {
91  storage::PartitionerMetadata* meta = partitioner_metadata + i;
93  meta->initialize();
95  }
96  // set the size of partitioner data
99 
100  // And master-only resources for running gleaner. These are NOT shared memory. local to master.
102  }
103 
104  // in child engines, we instantiate local mappers/reducer objects (but not the threads yet)
105  previous_snapshot_time_ = std::chrono::system_clock::now();
106  stop_requested_ = false;
107  if (!engine_->is_master()) {
110  for (uint16_t i = 0; i < engine_->get_options().log_.loggers_per_node_; ++i) {
111  local_mappers_.push_back(new LogMapper(engine_, i));
113  }
114  }
115 
116  // Launch the daemon thread at last
117  if (engine_->is_master()) {
118  snapshot_thread_ = std::move(std::thread(&SnapshotManagerPimpl::handle_snapshot, this));
119  } else {
120  snapshot_thread_ = std::move(std::thread(&SnapshotManagerPimpl::handle_snapshot_child, this));
121  }
122  return kRetOk;
123 }
124 
126  LOG(INFO) << "Uninitializing SnapshotManager..";
127  ErrorStackBatch batch;
130  }
132  if (engine_->is_master()) {
133  // also uninitialize the shared memory for partitioner
134  soc::GlobalMemoryAnchors* anchors
136  uint32_t max_storages = engine_->get_options().storage_.max_storages_;
137  for (storage::StorageId i = 0; i < max_storages; ++i) {
139  anchors->partitioner_metadata_[i].uninitialize();
141  }
142 
144  ASSERT_ND(local_reducer_ == nullptr);
145  ASSERT_ND(local_mappers_.size() == 0);
146 
148  } else {
149  if (local_reducer_) {
151  delete local_reducer_;
152  local_reducer_ = nullptr;
153  }
154  for (LogMapper* mapper : local_mappers_) {
155  batch.emprace_back(mapper->uninitialize());
156  delete mapper;
157  }
158  local_mappers_.clear();
160  }
161 
162  return SUMMARIZE_ERROR_BATCH(batch);
163 }
165  LOG(INFO) << "Stopping the snapshot thread...";
166  if (snapshot_thread_.joinable()) {
167  // whether from master or not, just make sure all reducers/mappers notice that it's closing
168  stop_requested_ = true;
171  if (engine_->is_master()) {
172  wakeup();
173  } else {
175  }
176  snapshot_thread_.join();
177  }
178  LOG(INFO) << "Stopped the snapshot thread.";
179 }
180 
182  uint64_t demand = control_block_->snapshot_wakeup_.acquire_ticket();
183  if (!is_stop_requested()) {
184  control_block_->snapshot_wakeup_.timedwait(demand, 20000ULL);
185  }
186 }
189 }
190 
192  LOG(INFO) << "Snapshot daemon started";
193  // The actual snapshotting can't start until all other modules are initialized.
196  }
197 
198  LOG(INFO) << "Snapshot daemon now starts taking snapshot";
199  while (!is_stop_requested()) {
200  sleep_a_while();
201  if (is_stop_requested()) {
202  break;
203  }
204  // should we start snapshotting? or keep sleeping?
205  bool triggered = false;
206  std::chrono::system_clock::time_point until = previous_snapshot_time_ +
207  std::chrono::milliseconds(get_option().snapshot_interval_milliseconds_);
209  Epoch previous_epoch = get_snapshot_epoch();
210  if (previous_epoch.is_valid() && previous_epoch == durable_epoch) {
211  LOG(INFO) << "Current snapshot is already latest. durable_epoch=" << durable_epoch;
213  && (!previous_epoch.is_valid()
214  || control_block_->get_requested_snapshot_epoch() > previous_epoch)) {
215  // if someone requested immediate snapshot, do it.
216  triggered = true;
217  LOG(INFO) << "Immediate snapshot request detected. snapshotting..";
218  } else if (std::chrono::system_clock::now() >= until) {
219  triggered = true;
220  LOG(INFO) << "Snapshot interval has elapsed. snapshotting..";
221  } else {
222  // TASK(Hideaki): check free pages in page pool and compare with configuration.
223  }
224 
225  if (triggered) {
226  Snapshot new_snapshot;
227  ErrorStack stack = handle_snapshot_triggered(&new_snapshot);
228  if (stack.is_error()) {
229  LOG(ERROR) << "Snapshot failed:" << stack;
230  }
231  } else {
232  VLOG(1) << "Snapshotting not triggered. going to sleep again";
233  }
234  }
235 
236  LOG(INFO) << "Snapshot daemon ended. ";
237 }
238 
240  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " started";
243  while (!is_stop_requested()) {
244  {
246  if (!is_stop_requested() && !is_gleaning()) {
248  }
249  }
250  if (is_stop_requested()) {
251  break;
252  } else if (!is_gleaning() || previous_id == control_block_->gleaner_.cur_snapshot_.id_) {
253  continue;
254  }
256  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " received a request"
257  << " for snapshot-" << current_id;
259  for (LogMapper* mapper : local_mappers_) {
260  mapper->launch_thread();
261  }
262  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " launched mappers/reducer"
263  " for snapshot-" << current_id;
264  for (LogMapper* mapper : local_mappers_) {
265  mapper->join_thread();
266  }
268  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " joined mappers/reducer"
269  " for snapshot-" << current_id;
270  previous_id = current_id;
271  }
272 
273  LOG(INFO) << "Child snapshot daemon-" << engine_->get_soc_id() << " ended";
274 }
275 
276 
278  bool wait_completion,
279  Epoch suggested_snapshot_epoch) {
280  LOG(INFO) << "Requesting to immediately take a snapshot. suggested_snapshot_epoch="
281  << suggested_snapshot_epoch;
282  Epoch before = get_snapshot_epoch();
284  ASSERT_ND(durable_epoch.is_valid());
285 
286  Epoch new_snapshot_epoch = durable_epoch;
287  if (suggested_snapshot_epoch.is_valid()) {
288  if (suggested_snapshot_epoch > durable_epoch) {
289  LOG(WARNING) << "You can't specify non-durable epoch for snapshot.";
290  } else {
291  new_snapshot_epoch = suggested_snapshot_epoch;
292  }
293  }
294 
295  if (before.is_valid() && before >= durable_epoch) {
296  LOG(INFO) << "Current snapshot already satisfies. new_snapshot_epoch=" << new_snapshot_epoch;
297  return;
298  }
299 
300  control_block_->requested_snapshot_epoch_.store(new_snapshot_epoch.value());
301  wakeup();
302  if (wait_completion) {
303  VLOG(0) << "Waiting for the completion of snapshot... before=" << before;
304  while (!is_stop_requested() &&
305  (!get_snapshot_epoch().is_valid() || new_snapshot_epoch > get_snapshot_epoch())) {
306  uint64_t demand = control_block_->snapshot_taken_.acquire_ticket();
307  if (!is_stop_requested() &&
308  (!get_snapshot_epoch().is_valid() || new_snapshot_epoch > get_snapshot_epoch())) {
309  control_block_->snapshot_taken_.timedwait(demand, 100000ULL);
310  }
311  }
312  }
313  LOG(INFO) << "Observed the completion of snapshot! after=" << get_snapshot_epoch();
314 }
315 
318  ASSERT_ND(engine_->get_storage_manager()->is_initialized()); // snapshot relied on storage module
320  Epoch previous_epoch = get_snapshot_epoch();
321  LOG(INFO) << "Taking a new snapshot. durable_epoch=" << durable_epoch
322  << ", requested_snapshot_epoch=" << control_block_->get_requested_snapshot_epoch()
323  << ". previous_snapshot=" << previous_epoch;
324  ASSERT_ND(durable_epoch.is_valid() &&
325  (!previous_epoch.is_valid() || durable_epoch > previous_epoch));
326  new_snapshot->base_epoch_ = previous_epoch;
328  if (requested_epoch.is_valid()) {
329  ASSERT_ND(requested_epoch <= durable_epoch);
330  ASSERT_ND(!previous_epoch.is_valid() || requested_epoch > previous_epoch);
331  new_snapshot->valid_until_epoch_ = requested_epoch;
332  } else {
333  new_snapshot->valid_until_epoch_ = durable_epoch;
334  }
336  ASSERT_ND(new_snapshot->max_storage_id_
338 
339  // determine the snapshot ID
340  SnapshotId snapshot_id;
342  snapshot_id = 1;
343  } else {
345  }
346  LOG(INFO) << "Issued ID for this snapshot:" << snapshot_id;
347  new_snapshot->id_ = snapshot_id;
348 
349  // okay, let's start the snapshotting.
350  // The procedures below will take long time, so we keep checking our "is_stop_requested"
351  // and stops our child threads when it happens.
352 
353  // For each storage that was modified in this snapshotting,
354  // this holds the pointer to new root page.
355  std::map<storage::StorageId, storage::SnapshotPagePointer> new_root_page_pointers;
356 
357  // Log gleaners design partitioning and do scatter-gather to consume the logs.
358  // This will create snapshot files at each partition and tell us the new root pages of
359  // each storage.
360  CHECK_ERROR(glean_logs(*new_snapshot, &new_root_page_pointers));
361 
362  // Write out the metadata file.
363  CHECK_ERROR(snapshot_metadata(*new_snapshot, new_root_page_pointers));
364 
365  // Invokes savepoint module to make sure this snapshot has "happened".
366  CHECK_ERROR(snapshot_savepoint(*new_snapshot));
367 
368  // install pointers to snapshot pages and drop volatile pages.
369  CHECK_ERROR(drop_volatile_pages(*new_snapshot, new_root_page_pointers));
370 
371  Epoch new_snapshot_epoch = new_snapshot->valid_until_epoch_;
372  ASSERT_ND(new_snapshot_epoch.is_valid() &&
373  (!get_snapshot_epoch().is_valid() || new_snapshot_epoch > get_snapshot_epoch()));
374 
375  // done. notify waiters if exist
376  Epoch::EpochInteger epoch_after = new_snapshot_epoch.value();
377  control_block_->previous_snapshot_id_ = snapshot_id;
378  previous_snapshot_time_ = std::chrono::system_clock::now();
379 
380  control_block_->snapshot_epoch_ = epoch_after;
383  return kRetOk;
384 }
385 
387  const Snapshot& new_snapshot,
388  std::map<storage::StorageId, storage::SnapshotPagePointer>* new_root_page_pointers) {
389  // Log gleaner is an object allocated/deallocated per snapshotting.
390  // Gleaner runs on this thread (snapshot_thread_)
391  LogGleaner gleaner(engine_, &gleaner_resource_, new_snapshot);
392  ErrorStack result = gleaner.execute();
393  if (result.is_error()) {
394  LOG(ERROR) << "Log Gleaner encountered either an error or early termination request";
395  }
396  // the output is list of pointers to new root pages
397  *new_root_page_pointers = gleaner.get_new_root_page_pointers();
398  return result;
399 }
400 
402  const Snapshot& new_snapshot,
403  const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers) {
404  // construct metadata object
405  SnapshotMetadata metadata;
406  metadata.id_ = new_snapshot.id_;
407  metadata.base_epoch_ = new_snapshot.base_epoch_.value();
408  metadata.valid_until_epoch_ = new_snapshot.valid_until_epoch_.value();
409  metadata.largest_storage_id_ = new_snapshot.max_storage_id_;
411 
412  // we modified the root page.
413  uint32_t installed_root_pages_count = 0;
414  for (storage::StorageId id = 1; id <= metadata.largest_storage_id_; ++id) {
415  const auto& it = new_root_page_pointers.find(id);
417  storage::Metadata* meta = metadata.get_metadata(id);
418  if (it != new_root_page_pointers.end()) {
419  storage::SnapshotPagePointer new_pointer = it->second;
420  // composer's construct_root should have been already set the new root pointer
421  ASSERT_ND(new_pointer == meta->root_snapshot_page_id_);
422  ++installed_root_pages_count;
423  }
424  }
425  LOG(INFO) << "Out of " << metadata.largest_storage_id_ << " storages, "
426  << installed_root_pages_count << " changed their root pages.";
427  ASSERT_ND(installed_root_pages_count == new_root_page_pointers.size());
428 
429  // save it to a file
430  fs::Path folder(get_option().get_primary_folder_path());
431  if (!fs::exists(folder)) {
432  if (!fs::create_directories(folder, true)) {
433  LOG(ERROR) << "Failed to create directory:" << folder << ". check permission.";
435  }
436  }
437 
438  fs::Path file = get_snapshot_metadata_file_path(new_snapshot.id_);
439  LOG(INFO) << "New snapshot metadata file fullpath=" << file;
440 
441  debugging::StopWatch stop_watch;
442  CHECK_ERROR(metadata.save_to_file(file));
443  stop_watch.stop();
444  LOG(INFO) << "Wrote a snapshot metadata file. size=" << fs::file_size(file) << " bytes"
445  << ", elapsed time to write=" << stop_watch.elapsed_ms() << "ms. now fsyncing...";
446  stop_watch.start();
447  fs::fsync(file, true);
448  stop_watch.stop();
449  LOG(INFO) << "fsynced the file and the folder! elapsed=" << stop_watch.elapsed_ms() << "ms.";
450  return kRetOk;
451 }
452 
454  SnapshotId snapshot_id,
455  SnapshotMetadata* out) {
456  fs::Path file = get_snapshot_metadata_file_path(snapshot_id);
457  LOG(INFO) << "Reading snapshot metadata file fullpath=" << file;
458 
459  debugging::StopWatch stop_watch;
460  CHECK_ERROR(out->load_from_file(file));
461  stop_watch.stop();
462  LOG(INFO) << "Read a snapshot metadata file. size=" << fs::file_size(file) << " bytes"
463  << ", elapsed time to read+parse=" << stop_watch.elapsed_ms() << "ms.";
464 
465  ASSERT_ND(out->id_ == snapshot_id);
466  return kRetOk;
467 }
468 
470  LOG(INFO) << "Taking savepoint to include this new snapshot....";
472  new_snapshot.id_,
473  new_snapshot.valid_until_epoch_));
476  == new_snapshot.valid_until_epoch_);
477  return kRetOk;
478 }
479 
481  fs::Path folder(get_option().get_primary_folder_path());
482  fs::Path file(folder);
483  file /= std::string("snapshot_metadata_")
484  + std::to_string(snapshot_id) + std::string(".xml");
485  return file;
486 }
487 
489  const Snapshot& new_snapshot,
490  const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers) {
491  // To speed up, we parallelize this process per node, and use the same partitioning scheme.
492  LOG(INFO) << "Dropping volatile pointers...";
493 
494  // initializations done.
495  // below, we should release the resources before exiting. So, let's not just use CHECK_ERROR.
496  const uint16_t soc_count = engine_->get_soc_count();
497 
498  // collect results of pointer dropping for all storages for all nodes.
499  // this is just to drop the root page. DropResult[storage_id][node].
500  memory::AlignedMemory result_memory;
501  result_memory.alloc(
502  sizeof(storage::Composer::DropResult) * soc_count * (new_snapshot.max_storage_id_ + 2U),
503  1U << 12,
505  0);
506  memory::AlignedMemory chunks_memory; // only for drop_root
507  chunks_memory.alloc(
508  sizeof(memory::PagePoolOffsetChunk) * soc_count,
509  1U << 12,
511  0);
512 
513  // So far, we pause transaction executions during this step to simplify the algorithm.
514  // Without this simplification, not only this thread but also normal transaction executions
515  // have to do several complex and expensive checks.
517  // It will take a while for individual worker threads to complete the currently running xcts.
518  // Just wait for a while to let that happen.
519  std::this_thread::sleep_for(std::chrono::milliseconds(100)); // almost forever in OLTP xcts.
520  LOG(INFO) << "Paused transaction executions to safely drop volatile pages and waited enough"
521  << " to let currently running xcts end. Now start replace pointers.";
522  debugging::StopWatch stop_watch;
523 
524  std::vector< std::thread > threads;
525  for (uint16_t node = 0; node < soc_count; ++node) {
526  threads.emplace_back(
528  this,
529  new_snapshot,
530  new_root_page_pointers,
531  result_memory.get_block(),
532  node);
533  }
534 
535  for (std::thread& thr : threads) {
536  thr.join();
537  }
538 
539  LOG(INFO) << "Joined child threads. Now consider dropping root pages";
540 
541  // At last, we consider dropping root volatile pages.
542  // usually, this happens only when the root page doesn't have any volatile pointer.
543  // As an exceptional case, we might drop ALL volatile pages of a storage whose max_observed
544  // is not updated but for some reason dropped_all is false, eg non-matching boundaries.
545  // even in that case, we can drop all volatile pages safely because this is within pause.
546  memory::PagePoolOffsetChunk* dropped_chunks = reinterpret_cast<memory::PagePoolOffsetChunk*>(
547  chunks_memory.get_block());
548  for (uint16_t node = 0; node < soc_count; ++node) {
549  dropped_chunks[node].clear();
550  }
552  = reinterpret_cast<storage::Composer::DropResult*>(result_memory.get_block());
553  for (storage::StorageId id = 1; id <= new_snapshot.max_storage_id_; ++id) {
554  VLOG(1) << "Considering to drop root page of storage-" << id << " ...";
555  bool cannot_drop = false;
556  for (uint16_t node = 0; node < soc_count; ++node) {
557  storage::Composer::DropResult result = results[soc_count * id + node];
558  ASSERT_ND(result.max_observed_.is_valid());
559  ASSERT_ND(result.max_observed_ >= new_snapshot.valid_until_epoch_);
560  if (result.max_observed_ > new_snapshot.valid_until_epoch_) {
561  cannot_drop = true;
562  break;
563  }
564  }
565  if (cannot_drop) {
566  continue;
567  }
568  LOG(INFO) << "Looks like we can drop ALL volatile pages of storage-" << id << "!!!";
569  // Still, the specific implementation of the storage might not choose to do so.
570  // We call a method in composer.
571  uint64_t dropped_count = 0;
573  new_snapshot,
574  0,
575  false,
576  dropped_chunks,
577  &dropped_count};
578  storage::Composer composer(engine_, id);
579  composer.drop_root_volatile(args);
580  LOG(INFO) << "As a result, we dropped " << dropped_count << " pages from storage-" << id;
581  }
582 
583 
585 
586  stop_watch.stop();
587  LOG(INFO) << "Total: Dropped volatile pages in " << stop_watch.elapsed_ms() << "ms.";
588 
589  chunks_memory.release_block();
590  result_memory.release_block();
591 
592  return kRetOk;
593 }
594 
596  const Snapshot& new_snapshot,
597  const std::map<storage::StorageId, storage::SnapshotPagePointer>& new_root_page_pointers,
598  void* result_memory,
599  uint16_t parallel_id) {
600  // this thread is pinned on its own socket. We use the same partitioning scheme as reducer
601  // so that this method mostly hits local pages
602  thread::NumaThreadScope numa_scope(parallel_id);
603 
604  const uint16_t soc_count = engine_->get_soc_count();
605  storage::Composer::DropResult* results // DropResult[storage_id][node]
606  = reinterpret_cast<storage::Composer::DropResult*>(result_memory);
607 
608  // To avoid invoking volatile pool for every dropped page, we cache them in chunks
609  memory::AlignedMemory chunks_memory;
610  chunks_memory.alloc(
611  sizeof(memory::PagePoolOffsetChunk) * soc_count,
612  1U << 12,
614  parallel_id);
615  memory::PagePoolOffsetChunk* dropped_chunks = reinterpret_cast<memory::PagePoolOffsetChunk*>(
616  chunks_memory.get_block());
617  for (uint16_t node = 0; node < soc_count; ++node) {
618  dropped_chunks[node].clear();
619  }
620 
621  LOG(INFO) << "Thread-" << parallel_id << " started dropping volatile pages.";
622 
623  uint64_t dropped_count_total = 0;
624  debugging::StopWatch stop_watch;
625  for (storage::StorageId id = 1; id <= new_snapshot.max_storage_id_; ++id) {
626  const auto& it = new_root_page_pointers.find(id);
627  if (it != new_root_page_pointers.end()) {
628  VLOG(0) << "Dropping pointers for storage-" << id << " ...";
629  storage::SnapshotPagePointer new_root_page_pointer = it->second;
630  ASSERT_ND(new_root_page_pointer != 0);
631  storage::Composer composer(engine_, id);
632  uint64_t dropped_count = 0;
634  new_snapshot,
635  parallel_id,
636  true,
637  dropped_chunks,
638  &dropped_count};
639  debugging::StopWatch watch;
640  storage::Composer::DropResult result = composer.drop_volatiles(args);
642  snapshot_pointer_ == new_root_page_pointer);
644  == new_root_page_pointer);
645  dropped_count_total += dropped_count;
646  watch.stop();
647  LOG(INFO) << "Thread-" << parallel_id << " drop_volatiles for storage-" << id
648  << " (" << engine_->get_storage_manager()->get_storage(id)->meta_.name_ << ")"
649  << " took " << watch.elapsed_sec() << "s. dropped_count=" << dropped_count
650  << ". result =" << result;
651  results[soc_count * id + parallel_id] = result;
652  } else {
653  VLOG(0) << "Thread-" << parallel_id << " storage-"
654  << id << " wasn't changed no drop pointers";
655  results[soc_count * id + parallel_id].max_observed_
656  = new_snapshot.valid_until_epoch_.one_more(); // do NOT drop root page in this case.
657  results[soc_count * id + parallel_id].dropped_all_ = false;
658  }
659  }
660 
661  stop_watch.stop();
662  LOG(INFO) << "Thread-" << parallel_id << " dropped " << dropped_count_total
663  << " volatile pointers in " << stop_watch.elapsed_ms() << "ms.";
664 
665  for (uint16_t node = 0; node < soc_count; ++node) {
666  memory::PagePoolOffsetChunk* chunk = dropped_chunks + node;
667  memory::PagePool* volatile_pool
669  if (!chunk->empty()) {
670  volatile_pool->release(chunk->size(), chunk);
671  }
672  ASSERT_ND(chunk->empty());
673  }
674  chunks_memory.release_block();
675 }
676 
677 } // namespace snapshot
678 } // namespace foedus
0x020D : "FILESYS: Failed to create a directory" .
Definition: error_code.hpp:138
Represents the data in one snapshot metadata file.
storage::StorageControlBlock * storage_control_blocks_
control block of all storages.
Metadata meta_
common part of the metadata.
Definition: storage.hpp:84
void start()
Take current time tick.
Definition: stop_watch.cpp:30
Epoch max_observed_
the largest Epoch it observed recursively.
Definition: composer.hpp:200
void handle_snapshot()
Main routine for snapshot_thread_ in master engine.
ErrorStack read_snapshot_metadata(SnapshotId snapshot_id, SnapshotMetadata *out)
ErrorStack take_savepoint_after_snapshot(snapshot::SnapshotId new_snapshot_id, Epoch new_snapshot_epoch)
Takes a savepoint just to remember the newly taken snapshot.
numa_alloc_onnode() and numa_free().
snapshot::SnapshotId get_latest_snapshot_id() const
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
void drop_volatile_pages_parallel(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers, void *result_memory, uint16_t parallel_id)
subroutine invoked by one thread for one node.
void launch_thread()
Start executing.
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
Definition: page_pool.hpp:173
DropResult drop_volatiles(const DropVolatilesArguments &args)
Drops volatile pages that have not been modified since the snapshotted epoch.
Definition: composer.cpp:83
Epoch::EpochInteger valid_until_epoch_
Equivalent to Snapshot::valid_until_epoch_.
Represents a logic to compose a new version of data pages for one storage.
Definition: composer.hpp:86
SnapshotManagerControlBlock * control_block_
void release_block()
Releases the memory block.
const SnapshotOptions & get_option() const
shorthand for engine_->get_options().snapshot_.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ErrorStack handle_snapshot_triggered(Snapshot *new_snapshot)
handle_snapshot() calls this when it should start snapshotting.
GlobalMemoryAnchors * get_global_memory_anchors()
A log-gleaner, which constructs a new set of snapshot files during snapshotting.
double elapsed_ms() const
Definition: stop_watch.hpp:48
uint32_t EpochInteger
Unsigned integer representation of epoch.
Definition: epoch.hpp:64
std::vector< PerNodeResource > per_node_resources_
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Definition: log_manager.cpp:32
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
Definition: snapshot.hpp:55
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
Definition: error_code.hpp:109
void resume_accepting_xct()
Make sure you call this after pause_accepting_xct().
Definition: xct_manager.cpp:34
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorStack save_to_file(const fs::Path &path) const
Atomically and durably writes out this object to the specified XML file.
DualPagePointer root_page_pointer_
Points to the root page (or something equivalent).
Definition: storage.hpp:82
Represents a time epoch.
Definition: epoch.hpp:61
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
uint16_t reducers_count_
Total number of mappers.
void pause_accepting_xct()
Pause all begin_xct until you call resume_accepting_xct()
Definition: xct_manager.cpp:33
ErrorStack clone_all_storage_metadata(snapshot::SnapshotMetadata *metadata)
This method is called during snapshotting to clone metadata of all existing storages to the given obj...
Pin the current thread to the given NUMA node in this object's scope.
ErrorStack snapshot_savepoint(const Snapshot &new_snapshot)
Sub-routine of handle_snapshot_triggered().
std::atomic< SnapshotId > previous_snapshot_id_
ID of previously completed snapshot.
A log mapper, which reads log files from one logger and sends them to corresponding log reducers...
const EngineOptions & get_options() const
Definition: engine.cpp:39
uint32_t max_storages_
Maximum number of storages in this database.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
LogGleanerResource gleaner_resource_
Local resources for gleaner, which runs only in the master node.
Zero is always reserved for invalid epoch.
Definition: epoch.hpp:68
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
Metadata of one storage.
Definition: metadata.hpp:58
uint64_t acquire_ticket() const
Gives the ticket to.
LogGleanerControlBlock gleaner_
Gleaner-related variables.
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
uint16_t mappers_count_
Total number of mappers.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
Definition: snapshot.hpp:58
Analogue of boost::filesystem::path.
Definition: path.hpp:37
storage::PartitionerMetadata * partitioner_metadata_
Tiny metadata memory for partitioners.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
storage::StorageOptions storage_
uint32_t data_size_
The size of the partitioner data.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
snapshot::SnapshotOptions snapshot_
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
SnapshotId id_
Unique ID of this snapshot.
Definition: snapshot.hpp:43
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
bool is_initialized() const override
Returns whether the object has been already initialized or not.
uint16_t all_count_
mappers_count_ + reducers_count_.
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
void wakeup_snapshot_children()
Fires snapshot_children_wakeup_.
soc::SharedPolling snapshot_wakeup_
Snapshot thread sleeps on this condition variable.
ErrorStack snapshot_metadata(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
StorageName name_
the unique name of this storage.
Definition: metadata.hpp:107
Epoch one_more() const
Definition: epoch.hpp:127
SnapshotId id_
Equivalent to Snapshot::id_.
SnapshotId increment(SnapshotId id)
Increment SnapshotId.
Definition: snapshot_id.hpp:52
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
Definition: filesystem.cpp:89
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
Set of option values given to the engine at start-up.
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
Definition: snapshot.hpp:37
Retrun value of drop_volatiles()
Definition: composer.hpp:171
std::chrono::system_clock::time_point previous_snapshot_time_
When snapshot_thread_ took snapshot last time.
std::atomic< Epoch::EpochInteger > requested_snapshot_epoch_
When a caller wants to immediately invoke snapshot, it calls trigger_snapshot_immediate(), which sets this value and then wakes up snapshot_thread_.
bool dropped_all_
Whether all volatile pages under the page was dropped.
Definition: composer.hpp:202
void * get_block() const
Returns the memory block.
Set of options for snapshot manager.
soc::SharedMutex mutex_
Serialize concurrent initialization of this partitioner.
uint16_t group_count_
Number of ThreadGroup in the engine.
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
Definition: page_pool.cpp:134
fs::Path get_snapshot_metadata_file_path(SnapshotId snapshot_id) const
each snapshot has a snapshot-metadata file "snapshot_metadata_.xml" in first node's firs...
Repository of all shared memory in one FOEDUS instance.
ErrorStack execute()
Main routine of log gleaner.
Just a set of pointers within global_memory_ for ease of use.
snapshot::SnapshotManagerControlBlock * snapshot_manager_memory_
Tiny memory for snapshot manager.
bool is_initialized() const override
Returns whether the engine is currently running.
Definition: engine.cpp:63
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
LogReducer * local_reducer_
Reducer in this node.
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
StorageId get_largest_storage_id()
Returns the largest StorageId that does or did exist.
std::vector< LogMapper * > local_mappers_
Mappers in this node.
bool is_valid() const
Definition: epoch.hpp:96
Epoch get_durable_global_epoch() const
Returns the durable epoch of the entire engine.
Definition: log_manager.cpp:36
Snapshot cur_snapshot_
The snapshot we are now taking.
thread::ThreadOptions thread_
Tiny metadata of partitioner for every storage used while log gleaning.
To reduce the overhead of grabbing/releasing pages from pool, we pack this many pointers for each gra...
Definition: page_pool.hpp:47
Epoch base_epoch_
This snapshot was taken on top of previous snapshot that is valid_until this epoch.
Definition: snapshot.hpp:49
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
ErrorStack drop_volatile_pages(const Snapshot &new_snapshot, const std::map< storage::StorageId, storage::SnapshotPagePointer > &new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
Represents one memory block aligned to actual OS/hardware pages.
ErrorStack load_from_file(const fs::Path &path)
Load the content of this object from the specified XML file.
std::atomic< bool > terminating_
Whether the engine is being terminated.
const ErrorStack kRetOk
Normal return value for no-error case.
storage::Metadata * get_metadata(storage::StorageId id)
double elapsed_sec() const
Definition: stop_watch.hpp:51
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
void stop_snapshot_thread()
This is a hidden API called at the beginning of engine shutdown (namely restart manager).
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
Epoch::EpochInteger base_epoch_
Equivalent to Snapshot::base_epoch_.
soc::SharedPolling snapshot_taken_
Fired (notify_all) whenever snapshotting is completed.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
Definition: engine.cpp:73
ErrorStack glean_logs(const Snapshot &new_snapshot, std::map< storage::StorageId, storage::SnapshotPagePointer > *new_root_page_pointers)
Sub-routine of handle_snapshot_triggered().
std::atomic< Epoch::EpochInteger > snapshot_epoch_
The most recently snapshot-ed epoch, all logs upto this epoch is safe to delete.
void handle_snapshot_child()
Main routine for snapshot_thread_ in child engines.
std::thread snapshot_thread_
The daemon thread of snapshot manager.
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
Definition: error_code.hpp:110
void drop_root_volatile(const DropVolatilesArguments &args)
This is additionally called when no partitions observed any new modifications.
Definition: composer.cpp:94
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
soc::SharedPolling snapshot_children_wakeup_
Child snapshot managers (the ones in SOC engines) sleep on this condition until the master snapshot m...
A high-resolution stop watch.
Definition: stop_watch.hpp:30
StorageControlBlock * get_storage(StorageId id)
Returns the storage of given ID.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
bool is_initialized() const
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
Definition: filesystem.cpp:203
A log reducer, which receives log entries sent from mappers and applies them to construct new snapsho...
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
void trigger_snapshot_immediate(bool wait_completion, Epoch suggested_snapshot_epoch)
const std::map< storage::StorageId, storage::SnapshotPagePointer > & get_new_root_page_pointers() const
Returns pointers to new root pages constructed at the end of gleaning.
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
std::atomic< bool > cancelled_
Whether the log gleaner has been cancalled.
std::atomic< bool > stop_requested_
To locally shutdown snapshot_thread_.
storage::StorageId largest_storage_id_
The largest StorageId we so far observed.
void signal()
Signal it to let waiters exit.
bool is_error() const
Returns if this return code is not kErrorCodeOk.
uint16_t loggers_per_node_
Number of loggers per NUMA node.
Definition: log_options.hpp:80
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
SnapshotPagePointer root_snapshot_page_id_
Pointer to a snapshotted page this storage is rooted at.
Definition: metadata.hpp:112