libfoedus-core
FOEDUS Core Library
direct_io_file.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <fcntl.h>
21 #include <glog/logging.h>
22 
23 #include <ostream>
24 #include <sstream>
25 #include <string>
26 
27 #include "foedus/assert_nd.hpp"
31 #include "foedus/fs/filesystem.hpp"
33 
34 namespace foedus {
35 namespace fs {
36 const uint64_t kOdirectAlignment = 0x1000;
37 inline bool is_odirect_aligned(uint64_t value) {
38  return (value % kOdirectAlignment) == 0;
39 }
40 inline bool is_odirect_aligned(const void* ptr) {
41  return (reinterpret_cast<uintptr_t>(ptr) % kOdirectAlignment) == 0;
42 }
43 
45  const Path &path,
46  const DeviceEmulationOptions &emulation)
47  : path_(path), emulation_(emulation),
48  descriptor_(kInvalidDescriptor), read_(false), write_(false), current_offset_(0) {
49 }
50 
52  close();
53 }
54 
55 ErrorCode DirectIoFile::open(bool read, bool write, bool append, bool create) {
56  if (descriptor_ != kInvalidDescriptor) {
57  LOG(ERROR) << "DirectIoFile::open(): already opened. this=" << *this;
59  }
60  Path folder(path_.parent_path());
61  if (!exists(folder)) {
62  if (!create_directories(folder, true)) {
63  if (exists(folder)) {
64  LOG(INFO) << "Interesting. other thread has created the folder:" << folder;
65  } else {
66  LOG(ERROR) << "DirectIoFile::open(): failed to create parent folder: "
67  << folder << ". err=" << assorted::os_error();
69  }
70  }
71  }
72 
73  LOG(INFO) << "DirectIoFile::open(): opening: " << path_ << ".. read =" << read << " write="
74  << write << ", append=" << append << ", create=" << create;
75  int oflags = O_LARGEFILE;
76  if (!emulation_.disable_direct_io_) {
77  oflags |= O_DIRECT;
78  }
79  if (read) {
80  if (write) {
81  oflags |= O_RDWR;
82  } else {
83  oflags |= O_RDONLY;
84  }
85  } else if (write) {
86  // oflags |= O_WRONLY;
87  oflags |= O_RDWR;
88  }
89  if (append) {
90  oflags |= O_APPEND;
91  }
92  if (create) {
93  oflags |= O_CREAT;
94  }
95  mode_t permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
96  descriptor_ = ::open(path_.c_str(), oflags, permissions);
97 
98  // tmpfs (such as /tmp, /dev/shm) refuses to receive O_DIRECT, returning EINVAL (22).
99  // In that case, let's retry without O_DIRECT flag. MySQL does similar thing, too.
100  if (descriptor_ == kInvalidDescriptor && (oflags & O_DIRECT) == O_DIRECT && errno == EINVAL) {
101  descriptor_ = ::open(path_.c_str(), oflags ^ O_DIRECT, permissions);
102  if (descriptor_ != kInvalidDescriptor) {
103  // Okay, O_DIRECT was the cause. Just complain. go on.
104  LOG(WARNING) << "DirectIoFile::open(): O_DIRECT flag for " << path_
105  << " was rejected and automatically removed. This usually means you specified"
106  << " tmpfs, such as /tmp, /dev/shm. Such non-durable devices should be used only"
107  << " for testing and performance experiments."
108  << " Related URL: http://www.gossamer-threads.com/lists/linux/kernel/720702";
109  }
110  // else the normal error flow below.
111  }
112 
113  if (descriptor_ == kInvalidDescriptor) {
114  LOG(ERROR) << "DirectIoFile::open(): failed to open: " << path_
115  << ". err=" << assorted::os_error();
117  } else {
118  read_ = read;
119  write_ = write;
120  current_offset_ = 0;
121  if (append) {
122  current_offset_ = file_size(path_);
123  }
124  LOG(INFO) << "DirectIoFile::open(): successfully opened. " << *this;
125  return kErrorCodeOk;
126  }
127 }
128 
130  if (descriptor_ != kInvalidDescriptor) {
131  int ret = ::close(descriptor_);
132  LOG(INFO) << "DirectIoFile::close(): closed. " << *this;
133  if (ret != 0) {
134  // Error at file close is nasty, we can't do much. We just report it in log.
135  LOG(ERROR) << "DirectIoFile::close(): error:" << foedus::assorted::os_error()
136  << " file=" << *this << ".";
137  ASSERT_ND(false); // crash only in debug mode
138  }
139  descriptor_ = kInvalidDescriptor;
140  return ret == 0;
141  }
142  return false;
143 }
144 ErrorCode DirectIoFile::read(uint64_t desired_bytes, memory::AlignedMemory* buffer) {
145  return read(desired_bytes, memory::AlignedMemorySlice(buffer));
146 }
147 ErrorCode DirectIoFile::read(uint64_t desired_bytes, const memory::AlignedMemorySlice& buffer) {
148  ASSERT_ND(!emulation_.null_device_);
149  if (desired_bytes > buffer.count_) {
150  LOG(ERROR) << "DirectIoFile::read(): too small buffer is given. desired_bytes="
151  << desired_bytes << ", buffer=" << buffer;
153  } else if (!is_odirect_aligned(buffer.memory_->get_alignment())
154  || !is_odirect_aligned(buffer.get_block())
155  || !is_odirect_aligned(desired_bytes)) {
156  LOG(ERROR) << "DirectIoFile::read(): non-aligned input is given. buffer=" << buffer
157  << ", desired_bytes=" << desired_bytes;
159  }
160  return read_raw(desired_bytes, buffer.get_block());
161 }
162 ErrorCode DirectIoFile::read_raw(uint64_t desired_bytes, void* buffer) {
163  ASSERT_ND(!emulation_.null_device_);
164  if (!is_opened()) {
165  LOG(ERROR) << "File not opened yet, or closed. this=" << *this;
166  return kErrorCodeFsNotOpened;
167  } else if (desired_bytes == 0) {
168  return kErrorCodeOk;
169  }
170 
171  // underlying POSIX filesystem might split the read for severel reasons. so, while loop.
172  uint64_t total_read = 0;
173  uint64_t remaining = desired_bytes;
174  while (remaining > 0) {
175  char* position = reinterpret_cast<char*>(buffer) + total_read;
176  ASSERT_ND(is_odirect_aligned(position));
177  ssize_t read_bytes = ::read(descriptor_, position, remaining);
178  if (read_bytes <= 0) {
179  // zero means end of file (unexpected). negative value means error.
180  LOG(ERROR) << "DirectIoFile::read(): error. this=" << *this
181  << ", total_read=" << total_read << ", desired_bytes=" << desired_bytes
182  << ", remaining=" << remaining << ", read_bytes=" << read_bytes
183  << ", err=" << assorted::os_error();
185  }
186 
187  if (static_cast<uint64_t>(read_bytes) > remaining) {
188  LOG(ERROR) << "DirectIoFile::read(): wtf? this=" << *this
189  << ", total_read=" << total_read << ", desired_bytes=" << desired_bytes
190  << ", remaining=" << remaining << ", read_bytes=" << read_bytes
191  << ", err=" << assorted::os_error();
192  return kErrorCodeFsExcessRead;
193  } else if (!emulation_.disable_direct_io_ && !is_odirect_aligned(read_bytes)) {
194  LOG(FATAL) << "DirectIoFile::read(): wtf2? this=" << *this
195  << ", total_read=" << total_read << ", desired_bytes=" << desired_bytes
196  << ", remaining=" << remaining << ", read_bytes=" << read_bytes
197  << ", err=" << assorted::os_error();
199  }
200 
201  total_read += read_bytes;
202  remaining -= read_bytes;
203  current_offset_ += read_bytes;
204  if (remaining > 0) {
205  LOG(INFO) << "Interesting. POSIX read() didn't complete the reads in one call."
206  << " total_read=" << total_read << ", desired_bytes=" << desired_bytes
207  << ", remaining=" << remaining;
208  }
209  }
210  if (emulation_.emulated_read_kb_cycles_ > 0) {
211  debugging::wait_rdtsc_cycles(emulation_.emulated_read_kb_cycles_ * (desired_bytes >> 10));
212  }
213  return kErrorCodeOk;
214 }
215 
216 ErrorCode DirectIoFile::write(uint64_t desired_bytes, const memory::AlignedMemory& buffer) {
217  return write(desired_bytes, memory::AlignedMemorySlice(
218  const_cast<memory::AlignedMemory*>(&buffer)));
219 }
220 
221 ErrorCode DirectIoFile::write(uint64_t desired_bytes, const memory::AlignedMemorySlice& buffer) {
222  ASSERT_ND(buffer.is_valid());
223  if (desired_bytes > buffer.count_) {
224  LOG(ERROR) << "DirectIoFile::write(): too small buffer is given. desired_bytes="
225  << desired_bytes << ", buffer=" << buffer;
227  } else if (!is_odirect_aligned(buffer.memory_->get_alignment())
228  || !is_odirect_aligned(buffer.get_block())
229  || !is_odirect_aligned(desired_bytes)) {
230  LOG(ERROR) << "DirectIoFile::write(): non-aligned input is given. buffer=" << buffer
231  << ", desired_bytes=" << desired_bytes;
233  }
234  return write_raw(desired_bytes, buffer.get_block());
235 }
236 ErrorCode DirectIoFile::write_raw(uint64_t desired_bytes, const void* buffer) {
237  if (!is_opened()) {
238  LOG(ERROR) << "File not opened yet, or closed. this=" << *this;
239  return kErrorCodeFsNotOpened;
240  } else if (desired_bytes == 0) {
241  return kErrorCodeOk;
242  }
243 
244  if (emulation_.null_device_) {
245  return kErrorCodeOk;
246  }
247 
248  // underlying POSIX filesystem might split the write for severel reasons. so, while loop.
249  VLOG(1) << "DirectIoFile::write(). desired_bytes=" << desired_bytes << ", buffer=" << buffer;
250  uint64_t total_written = 0;
251  uint64_t remaining = desired_bytes;
252  while (remaining > 0) {
253  const void* position = reinterpret_cast<const char*>(buffer) + total_written;
254  VLOG(1) << "DirectIoFile::write(). position=" << position;
255  ASSERT_ND(is_odirect_aligned(position));
256  ssize_t written_bytes = ::write(descriptor_, position, remaining);
257  if (written_bytes < 0) {
258  // negative value means error.
259  LOG(ERROR) << "DirectIoFile::write(): error. this=" << *this
260  << ", total_written=" << total_written << ", desired_bytes=" << desired_bytes
261  << ", remaining=" << remaining << ", written_bytes=" << written_bytes
262  << ", err=" << assorted::os_error();
263  // TASK(Hideaki) more error codes depending on errno. but mostly it should be disk-full
264  return kErrorCodeFsWriteFail;
265  }
266 
267  if (static_cast<uint64_t>(written_bytes) > remaining) {
268  LOG(ERROR) << "DirectIoFile::write(): wtf? this=" << *this
269  << ", total_written=" << total_written << ", desired_bytes=" << desired_bytes
270  << ", remaining=" << remaining << ", written_bytes=" << written_bytes
271  << ", err=" << assorted::os_error();
273  } else if (!emulation_.disable_direct_io_ && !is_odirect_aligned(written_bytes)) {
274  LOG(FATAL) << "DirectIoFile::write(): wtf2? this=" << *this
275  << ", total_written=" << total_written << ", desired_bytes=" << desired_bytes
276  << ", remaining=" << remaining << ", written_bytes=" << written_bytes
277  << ", err=" << assorted::os_error();
279  }
280 
281  total_written += written_bytes;
282  remaining -= written_bytes;
283  current_offset_ += written_bytes;
284  if (remaining > 0) {
285  LOG(INFO) << "Interesting. POSIX write() didn't complete the writes in one call."
286  << " total_written=" << total_written << ", desired_bytes=" << desired_bytes
287  << ", remaining=" << remaining;
288  }
289  }
290  if (emulation_.emulated_write_kb_cycles_ > 0) {
291  debugging::wait_rdtsc_cycles(emulation_.emulated_write_kb_cycles_ * (desired_bytes >> 10));
292  }
293  return kErrorCodeOk;
294 }
295 
296 ErrorCode DirectIoFile::truncate(uint64_t new_length, bool sync) {
297  if (!is_odirect_aligned(new_length)) {
298  LOG(ERROR) << "DirectIoFile::truncate(): non-aligned input is given. "
299  << " new_length=" << new_length;
301  }
302  LOG(INFO) << "DirectIoFile::truncate(): truncating " << *this << " to " << new_length
303  << " bytes..";
304  if (!is_opened()) {
305  return kErrorCodeFsNotOpened;
306  }
307 
308  if (emulation_.null_device_) {
309  current_offset_ = new_length;
310  return kErrorCodeOk;
311  }
312 
313  if (::ftruncate(descriptor_, new_length) != 0) {
314  LOG(ERROR) << "DirectIoFile::truncate(): failed. this=" << *this
315  << " err=" << assorted::os_error();
317  }
318  current_offset_ = new_length;
319  if (sync) {
320  LOG(INFO) << "DirectIoFile::truncate(): also fsync..";
321  foedus::fs::fsync(path_, true);
322  }
323  return kErrorCodeOk;
324 }
325 
326 ErrorCode DirectIoFile::seek(uint64_t offset, SeekType seek_type) {
327  if (!is_odirect_aligned(offset)) {
328  LOG(ERROR) << "DirectIoFile::seek(): non-aligned input is given. offset=" << offset;
330  }
331  if (emulation_.null_device_) {
332  return kErrorCodeOk;
333  }
334  __off_t ret;
335  switch (seek_type) {
336  case kDirectIoSeekSet:
337  ret = ::lseek(descriptor_, offset, SEEK_SET);
338  break;
339  case kDirectIoSeekCur:
340  ret = ::lseek(descriptor_, offset, SEEK_CUR);
341  break;
342  case kDirectIoSeekEnd:
343  ret = ::lseek(descriptor_, offset, SEEK_END);
344  break;
345  default:
346  LOG(ERROR) << "DirectIoFile::seek(): wtf?? seek_type=" << seek_type;
348  }
349  if (ret < 0) {
350  LOG(ERROR) << "DirectIoFile::seek(): failed. this=" << *this << ",err=" << assorted::os_error();
351  return kErrorCodeFsSeekFailed;
352  }
353  current_offset_ = ret;
354  if (emulation_.emulated_seek_latency_cycles_ > 0) {
356  }
357  return kErrorCodeOk;
358 }
359 
361  if (!is_opened()) {
362  LOG(ERROR) << "File not opened yet, or closed. this=" << *this;
363  return kErrorCodeFsNotOpened;
364  }
365  if (!is_write()) {
367  }
368 
369  if (emulation_.null_device_) {
370  return kErrorCodeOk;
371  }
372 
373  int ret = ::fsync(descriptor_);
374  if (ret != 0) {
375  LOG(ERROR) << "DirectIoFile::sync(): fsync failed. this=" << *this
376  << ", err=" << assorted::os_error();
377  return kErrorCodeFsSyncFailed;
378  }
379 
380  return kErrorCodeOk;
381 }
382 
383 std::string DirectIoFile::to_string() const {
384  std::stringstream stream;
385  stream << *this;
386  return stream.str();
387 }
388 std::ostream& operator<<(std::ostream& o, const DirectIoFile& v) {
389  o << "<DirectIoFile>"
390  << "<path>" << v.get_path() << "</path>"
391  << "<descriptor>" << v.get_descriptor() << "</descriptor>"
392  << "<read>" << v.is_read() << "</read>"
393  << "<write>" << v.is_write() << "</write>"
394  << "<current_offset>" << v.get_current_offset() << "</current_offset>"
395  << "</DirectIoFile>";
396  return o;
397 }
398 
399 } // namespace fs
400 } // namespace foedus
0x020D : "FILESYS: Failed to create a directory" .
Definition: error_code.hpp:138
The offset is set to its current location plus offset bytes.
ErrorCode truncate(uint64_t new_length, bool sync=false)
Discard the content of the file after the given offset.
std::ostream & operator<<(std::ostream &o, const DirectIoFile &v)
0x020F : "FILESYS: Direct I/O operation resulted in non-aligned count of bytes. Filesyste bug...
Definition: error_code.hpp:140
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
uint32_t emulated_seek_latency_cycles_
[Experiments] additional CPU cycles to busy-wait for each seek.
uint64_t get_alignment() const
Returns the alignment of the memory block.
uint64_t count_
Byte count of this slice in memory_.
0x0202 : "FILESYS: Failed to open a file" .
Definition: error_code.hpp:127
ErrorCode write(uint64_t desired_bytes, const foedus::memory::AlignedMemory &buffer)
Sequentially write the given amount of contents from the current position.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
AlignedMemory * memory_
The wrapped memory.
bool close()
Close the file if not yet closed.
ErrorCode open(bool read, bool write, bool append, bool create)
Tries to open the file for the specified volume.
void wait_rdtsc_cycles(uint64_t cycles)
Wait until the given CPU cycles elapse.
Definition: rdtsc.hpp:60
uint64_t get_current_offset() const
~DirectIoFile()
Automatically closes the file if it is opened.
0x0203 : "FILESYS: Invalid arguments for seek()" .
Definition: error_code.hpp:128
ErrorCode read_raw(uint64_t desired_bytes, void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
0x0201 : "FILESYS: Already opened" .
Definition: error_code.hpp:126
0 means no-error.
Definition: error_code.hpp:87
Analogue of boost::filesystem::path.
Definition: path.hpp:37
0x0204 : "FILESYS: file seek failed" .
Definition: error_code.hpp:129
0x0209 : "FILESYS: Disk write failed." .
Definition: error_code.hpp:134
The offset is set to the size of the file plus offset bytes.
0x020E : "FILESYS: File truncation failed" .
Definition: error_code.hpp:139
bool is_opened() const
Whether the file is already and successfully opened.
SeekType
Analogue of SEEK_SET/SEEK_CUR/SEEK_END in POSIX.
ErrorCode sync()
Analogues of POSIX fsync().
bool is_odirect_aligned(uint64_t value)
ErrorCode seek(uint64_t offset, SeekType seek_type)
Sets the position of the next byte to be written/extracted from/to the stream.
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
Definition: filesystem.cpp:89
bool exists(const Path &p)
Returns if the file exists.
Definition: filesystem.hpp:128
ErrorCode read(uint64_t desired_bytes, foedus::memory::AlignedMemory *buffer)
Sequentially read the given amount of contents from the current position.
uint32_t emulated_write_kb_cycles_
[Experiments] additional CPU cycles to busy-wait for each 1KB write.
Path parent_path() const
Definition: path.cpp:55
A slice of foedus::memory::AlignedMemory.
0x020C : "FILESYS: fsync() failed." .
Definition: error_code.hpp:137
uint64_t file_size(const Path &p)
Returns size of the file.
Definition: filesystem.cpp:120
const char * c_str() const
Definition: path.hpp:64
ErrorCode write_raw(uint64_t desired_bytes, const void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
file_descriptor get_descriptor() const
Set of configurations to emulate slower devices for some experiments.
0x0207 : "FILESYS: reached end of file before completing reads" .
Definition: error_code.hpp:132
std::string os_error()
Thread-safe strerror(errno).
Represents an I/O stream on one file without filesystem caching.
0x020A : "FILESYS: wrote more than expected" .
Definition: error_code.hpp:135
Represents one memory block aligned to actual OS/hardware pages.
POSIX open() semantics says -1 is invalid or not-yet-opened.
0x0208 : "FILESYS: read more than expected" .
Definition: error_code.hpp:133
0x020B : "FILESYS: File not opened yet or failed to open." .
Definition: error_code.hpp:136
Implements an RDTSC (Real-time time stamp counter) wait to emulate latency on slower devices...
0x0206 : "FILESYS: file buffer is not aligned" .
Definition: error_code.hpp:131
bool null_device_
[Experiments] as if we write out to /dev/null.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices.
Definition: filesystem.cpp:203
0x0205 : "FILESYS: file buffer is too small" .
Definition: error_code.hpp:130
The offset is set to offset bytes.
const uint64_t kOdirectAlignment
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
uint32_t emulated_read_kb_cycles_
[Experiments] additional CPU cycles to busy-wait for each 1KB read.
bool disable_direct_io_
[Experiments] Whether to disable Direct I/O and use non-direct I/O instead.
std::string to_string() const