libfoedus-core
FOEDUS Core Library
snapshot_writer_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <stdint.h>
21 #include <glog/logging.h>
22 
23 #include <string>
24 
25 #include "foedus/engine.hpp"
29 #include "foedus/fs/filesystem.hpp"
33 
34 namespace foedus {
35 namespace snapshot {
37  Engine* engine,
38  uint16_t numa_node,
39  SnapshotId snapshot_id,
40  memory::AlignedMemory* pool_memory,
41  memory::AlignedMemory* intermediate_memory,
42  bool append)
43  : engine_(engine),
44  numa_node_(numa_node),
45  append_(append),
46  snapshot_id_(snapshot_id),
47  pool_memory_(pool_memory),
48  intermediate_memory_(intermediate_memory),
49  snapshot_file_(nullptr),
50  next_page_id_(0) {
51 }
52 
54  if (snapshot_file_) {
55  bool success = true;
56  fs::Path path = snapshot_file_->get_path();
57  bool closed = snapshot_file_->close();
58  if (!closed) {
59  LOG(ERROR) << "Failed to close a snapshot file: " << *this;
60  success = false;
61  }
62  // also fsync the file.
63  if (!fs::fsync(path, true)) {
64  LOG(ERROR) << "Failed to fsync a snapshot file: " << *this;
65  success = false;
66  }
67 
68  delete snapshot_file_;
69  snapshot_file_ = nullptr;
70  return success;
71  } else {
72  return true;
73  }
74 }
75 
76 fs::Path SnapshotWriter::get_snapshot_file_path() const {
77  fs::Path path(engine_->get_options().snapshot_.construct_snapshot_file_path(snapshot_id_,
78  numa_node_));
79  return path;
80 }
81 
83  close();
84  fs::Path path(get_snapshot_file_path());
85  snapshot_file_ = new fs::DirectIoFile(path, engine_->get_options().snapshot_.emulation_);
86  bool create_new_file = append_ ? false : true;
87  WRAP_ERROR_CODE(snapshot_file_->open(true, true, true, create_new_file));
88  if (!append_) {
89  // write page-0. this is a dummy page which will never be read
90  char* first_page = reinterpret_cast<char*>(pool_memory_->get_block());
91  // first 8 bytes have some data, but we would use them just for sanity checks and debugging
92  storage::PageHeader* first_page_header = reinterpret_cast<storage::PageHeader*>(first_page);
93  first_page_header->page_id_ = storage::to_snapshot_page_pointer(snapshot_id_, numa_node_, 0);
94  first_page_header->storage_id_ = 0x1BF0ED05; // something unusual
95  first_page_header->checksum_ = 0x1BF0ED05; // something unusual
96  std::memset(
97  first_page + sizeof(storage::PageHeader),
98  0,
99  sizeof(storage::Page) - sizeof(storage::PageHeader));
100  std::string duh("This is the first 4kb page of a snapshot file in libfoedus."
101  " The first page is never used as data. It just has the common page header"
102  " and these useless sentences. Maybe we put our complaints on our cafeteria here.");
103  std::memcpy(first_page + sizeof(storage::PageHeader), duh.data(), duh.size());
104 
105  WRAP_ERROR_CODE(snapshot_file_->write(sizeof(storage::Page), *pool_memory_));
106  next_page_id_ = storage::to_snapshot_page_pointer(snapshot_id_, numa_node_, 1);
107  } else {
108  // if appending, nothing to do
109  uint64_t file_size = snapshot_file_->get_current_offset();
110  LOG(INFO) << to_string() << " Appending to " << file_size << "th bytes";
111  ASSERT_ND(file_size >= sizeof(storage::Page)); // have at least the dummy page
112  ASSERT_ND(file_size % sizeof(storage::Page) == 0); // must be aligned writes
113  uint64_t next_offset = file_size / sizeof(storage::Page);
114  next_page_id_ = storage::to_snapshot_page_pointer(snapshot_id_, numa_node_, next_offset);
115  }
116  return kRetOk;
117 }
118 
119 ErrorCode SnapshotWriter::dump_general(
120  memory::AlignedMemory* buffer,
121  memory::PagePoolOffset from_page,
122  uint32_t count) {
123  ASSERT_ND(buffer->get_size() / sizeof(storage::Page) >= from_page + count);
124 #ifndef NDEBUG
125  storage::Page* base = reinterpret_cast<storage::Page*>(buffer->get_block());
126  for (memory::PagePoolOffset i = 0; i < count; ++i) {
127  storage::SnapshotLocalPageId correct_local_page_id
129  storage::Page* page = base + from_page + i;
130  uint64_t page_id = page->get_header().page_id_;
131  storage::SnapshotLocalPageId local_page_id
133  uint16_t node = storage::extract_numa_node_from_snapshot_pointer(page_id);
134  uint16_t snapshot_id = storage::extract_snapshot_id_from_snapshot_pointer(page_id);
135  ASSERT_ND(local_page_id == correct_local_page_id);
136  ASSERT_ND(node == numa_node_);
137  ASSERT_ND(snapshot_id == snapshot_id_);
138  }
139 #endif // NDEBUG
140  CHECK_ERROR_CODE(snapshot_file_->write(
141  sizeof(storage::Page) * count,
143  buffer,
144  sizeof(storage::Page) * from_page,
145  sizeof(storage::Page) * count)));
146  next_page_id_ += count;
147  return kErrorCodeOk;
148 }
149 
150 ErrorCode SnapshotWriter::expand_pool_memory(uint32_t required_pages, bool retain_content) {
151  if (required_pages <= get_page_size()) {
152  return kErrorCodeOk;
153  }
154 
155  uint64_t bytes = required_pages * sizeof(storage::Page);
156  CHECK_ERROR_CODE(pool_memory_->assure_capacity(bytes, 2.0, retain_content));
157  ASSERT_ND(get_page_size() >= required_pages);
158  return kErrorCodeOk;
159 }
160 
161 
162 
163 ErrorCode SnapshotWriter::expand_intermediate_memory(uint32_t required_pages, bool retain_content) {
164  if (required_pages <= get_intermediate_size()) {
165  return kErrorCodeOk;
166  }
167 
168  uint64_t bytes = required_pages * sizeof(storage::Page);
169  CHECK_ERROR_CODE(intermediate_memory_->assure_capacity(bytes, 2.0, retain_content));
170  ASSERT_ND(get_intermediate_size() >= required_pages);
171  return kErrorCodeOk;
172 }
173 
174 std::ostream& operator<<(std::ostream& o, const SnapshotWriter& v) {
175  o << "<SnapshotWriter>"
176  << "<numa_node_>" << v.numa_node_ << "</numa_node_>"
177  << "<append_>" << v.append_ << "</append_>"
178  << "<snapshot_id_>" << v.snapshot_id_ << "</snapshot_id_>"
179  << "<pool_memory_>" << *v.pool_memory_ << "</pool_memory_>"
180  << "<intermediate_memory_>" << *v.intermediate_memory_ << "</intermediate_memory_>"
181  << "<next_page_id_>" << v.next_page_id_ << "</next_page_id_>"
182  << "</SnapshotWriter>";
183  return o;
184 }
185 
186 
187 } // namespace snapshot
188 } // namespace foedus
uint64_t SnapshotLocalPageId
Represents a local page ID in each one snapshot file in some NUMA node.
Definition: storage_id.hpp:89
bool close()
Close the file and makes sure all writes become durable (including the directory entry).
SnapshotLocalPageId extract_local_page_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:91
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
ErrorCode write(uint64_t desired_bytes, const foedus::memory::AlignedMemory &buffer)
Sequentially write the given amount of contents from the current position.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower data device.
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
SnapshotPagePointer to_snapshot_page_pointer(uint16_t snapshot_id, uint8_t node, SnapshotLocalPageId local_page_id)
Definition: storage_id.hpp:106
bool close()
Close the file if not yet closed.
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
ErrorCode open(bool read, bool write, bool append, bool create)
Tries to open the file for the specified volume.
uint64_t get_current_offset() const
const EngineOptions & get_options() const
Definition: engine.cpp:39
ErrorStack open()
Open the file so that the writer can start writing.
0 means no-error.
Definition: error_code.hpp:87
Analogue of boost::filesystem::path.
Definition: path.hpp:37
snapshot::SnapshotOptions snapshot_
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
Just a marker to denote that a memory region represents a data page.
Definition: page.hpp:184
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
A slice of foedus::memory::AlignedMemory.
void * get_block() const
Returns the memory block.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
uint16_t SnapshotId
Unique ID of Snapshot.
Definition: snapshot_id.hpp:43
std::string construct_snapshot_file_path(int snapshot_id, int node) const
'folder_path'/snapshot_'snapshot-id'node'node-id'.data.
uint64_t get_size() const
Returns the byte size of the memory block.
Checksum checksum_
Checksum of the content of this page to detect corrupted pages.
Definition: page.hpp:202
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
Represents an I/O stream on one file without filesystem caching.
Represents one memory block aligned to actual OS/hardware pages.
const ErrorStack kRetOk
Normal return value for no-error case.
ErrorCode expand_pool_memory(uint32_t required_pages, bool retain_content)
Expands pool_memory_ in case it is too small.
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
Definition: filesystem.cpp:203
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
Definition: log_buffer.cpp:32
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
Writes out one snapshot file for all data pages in one reducer.