21 #include <glog/logging.h> 
   38   return (value % kOdirectAlignment) == 0;
 
   41   return (reinterpret_cast<uintptr_t>(ptr) % kOdirectAlignment) == 0;
 
   47   : path_(path), emulation_(emulation),
 
   48   descriptor_(kInvalidDescriptor), read_(false), write_(false), current_offset_(0) {
 
   57       LOG(ERROR) << 
"DirectIoFile::open(): already opened. this=" << *
this;
 
   64         LOG(INFO) << 
"Interesting. other thread has created the folder:" << folder;
 
   66         LOG(ERROR) << 
"DirectIoFile::open(): failed to create parent folder: " 
   73   LOG(INFO) << 
"DirectIoFile::open(): opening: " << path_ << 
"..  read =" << read << 
" write=" 
   74     << write << 
", append=" << append << 
", create=" << create;
 
   75   int oflags = O_LARGEFILE;
 
   95   mode_t permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
 
   96   descriptor_ = 
::open(path_.
c_str(), oflags, permissions);
 
  100   if (descriptor_ == 
kInvalidDescriptor && (oflags & O_DIRECT) == O_DIRECT && errno == EINVAL) {
 
  101     descriptor_ = 
::open(path_.
c_str(), oflags ^ O_DIRECT, permissions);
 
  104       LOG(WARNING) << 
"DirectIoFile::open(): O_DIRECT flag for " << path_
 
  105         << 
" was rejected and automatically removed. This usually means you specified" 
  106         << 
" tmpfs, such as /tmp, /dev/shm. Such non-durable devices should be used only" 
  107         << 
" for testing and performance experiments." 
  108         << 
" Related URL: http://www.gossamer-threads.com/lists/linux/kernel/720702";
 
  114     LOG(ERROR) << 
"DirectIoFile::open(): failed to open: " << path_
 
  124     LOG(INFO) << 
"DirectIoFile::open(): successfully opened. " << *
this;
 
  131     int ret = 
::close(descriptor_);
 
  132     LOG(INFO) << 
"DirectIoFile::close(): closed. " << *
this;
 
  136         << 
" file=" << *
this << 
".";
 
  149   if (desired_bytes > buffer.
count_) {
 
  150     LOG(ERROR) << 
"DirectIoFile::read(): too small buffer is given. desired_bytes=" 
  151       << desired_bytes << 
", buffer=" << buffer;
 
  156     LOG(ERROR) << 
"DirectIoFile::read(): non-aligned input is given. buffer=" << buffer
 
  157       << 
", desired_bytes=" << desired_bytes;
 
  165     LOG(ERROR) << 
"File not opened yet, or closed. this=" << *
this;
 
  167   } 
else if (desired_bytes == 0) {
 
  172   uint64_t total_read = 0;
 
  173   uint64_t remaining = desired_bytes;
 
  174   while (remaining > 0) {
 
  175     char* position = 
reinterpret_cast<char*
>(buffer) + total_read;
 
  177     ssize_t read_bytes = 
::read(descriptor_, position, remaining);
 
  178     if (read_bytes <= 0) {
 
  180       LOG(ERROR) << 
"DirectIoFile::read(): error. this=" << *
this 
  181         << 
", total_read=" << total_read << 
", desired_bytes=" << desired_bytes
 
  182         << 
", remaining=" << remaining << 
", read_bytes=" << read_bytes
 
  187     if (static_cast<uint64_t>(read_bytes) > remaining) {
 
  188       LOG(ERROR) << 
"DirectIoFile::read(): wtf? this=" << *
this 
  189         << 
", total_read=" << total_read << 
", desired_bytes=" << desired_bytes
 
  190         << 
", remaining=" << remaining << 
", read_bytes=" << read_bytes
 
  194       LOG(FATAL) << 
"DirectIoFile::read(): wtf2? this=" << *
this 
  195         << 
", total_read=" << total_read << 
", desired_bytes=" << desired_bytes
 
  196         << 
", remaining=" << remaining << 
", read_bytes=" << read_bytes
 
  201     total_read += read_bytes;
 
  202     remaining -= read_bytes;
 
  203     current_offset_ += read_bytes;
 
  205       LOG(INFO) << 
"Interesting. POSIX read() didn't complete the reads in one call." 
  206         << 
" total_read=" << total_read << 
", desired_bytes=" << desired_bytes
 
  207         << 
", remaining=" << remaining;
 
  218     const_cast<memory::AlignedMemory*>(&buffer)));
 
  223   if (desired_bytes > buffer.
count_) {
 
  224     LOG(ERROR) << 
"DirectIoFile::write(): too small buffer is given. desired_bytes=" 
  225       << desired_bytes << 
", buffer=" << buffer;
 
  230     LOG(ERROR) << 
"DirectIoFile::write(): non-aligned input is given. buffer=" << buffer
 
  231       << 
", desired_bytes=" << desired_bytes;
 
  238     LOG(ERROR) << 
"File not opened yet, or closed. this=" << *
this;
 
  240   } 
else if (desired_bytes == 0) {
 
  249   VLOG(1) << 
"DirectIoFile::write(). desired_bytes=" << desired_bytes << 
", buffer=" << buffer;
 
  250   uint64_t total_written = 0;
 
  251   uint64_t remaining = desired_bytes;
 
  252   while (remaining > 0) {
 
  253     const void* position = 
reinterpret_cast<const char*
>(buffer) + total_written;
 
  254     VLOG(1) << 
"DirectIoFile::write(). position=" << position;
 
  256     ssize_t written_bytes = 
::write(descriptor_, position, remaining);
 
  257     if (written_bytes < 0) {
 
  259       LOG(ERROR) << 
"DirectIoFile::write(): error. this=" << *
this 
  260         << 
", total_written=" << total_written << 
", desired_bytes=" << desired_bytes
 
  261         << 
", remaining=" << remaining << 
", written_bytes=" << written_bytes
 
  267     if (static_cast<uint64_t>(written_bytes) > remaining) {
 
  268       LOG(ERROR) << 
"DirectIoFile::write(): wtf? this=" << *
this 
  269         << 
", total_written=" << total_written << 
", desired_bytes=" << desired_bytes
 
  270         << 
", remaining=" << remaining << 
", written_bytes=" << written_bytes
 
  274       LOG(FATAL) << 
"DirectIoFile::write(): wtf2? this=" << *
this 
  275         << 
", total_written=" << total_written << 
", desired_bytes=" << desired_bytes
 
  276         << 
", remaining=" << remaining << 
", written_bytes=" << written_bytes
 
  281     total_written += written_bytes;
 
  282     remaining -= written_bytes;
 
  283     current_offset_ += written_bytes;
 
  285       LOG(INFO) << 
"Interesting. POSIX write() didn't complete the writes in one call." 
  286         << 
" total_written=" << total_written << 
", desired_bytes=" << desired_bytes
 
  287         << 
", remaining=" << remaining;
 
  298     LOG(ERROR) << 
"DirectIoFile::truncate(): non-aligned input is given. " 
  299       << 
" new_length=" << new_length;
 
  302   LOG(INFO) << 
"DirectIoFile::truncate(): truncating " << *
this << 
" to " << new_length
 
  309     current_offset_ = new_length;
 
  313   if (::ftruncate(descriptor_, new_length) != 0) {
 
  314     LOG(ERROR) << 
"DirectIoFile::truncate(): failed. this=" << *
this 
  318   current_offset_ = new_length;
 
  320     LOG(INFO) << 
"DirectIoFile::truncate(): also fsync..";
 
  328     LOG(ERROR) << 
"DirectIoFile::seek(): non-aligned input is given. offset=" << offset;
 
  337       ret = ::lseek(descriptor_, offset, SEEK_SET);
 
  340       ret = ::lseek(descriptor_, offset, SEEK_CUR);
 
  343       ret = ::lseek(descriptor_, offset, SEEK_END);
 
  346       LOG(ERROR) << 
"DirectIoFile::seek(): wtf?? seek_type=" << seek_type;
 
  350     LOG(ERROR) << 
"DirectIoFile::seek(): failed. this=" << *
this << 
",err=" << 
assorted::os_error();
 
  353   current_offset_ = ret;
 
  362     LOG(ERROR) << 
"File not opened yet, or closed. this=" << *
this;
 
  373   int ret = 
::fsync(descriptor_);
 
  375     LOG(ERROR) << 
"DirectIoFile::sync(): fsync failed. this=" << *
this 
  384   std::stringstream stream;
 
  389   o << 
"<DirectIoFile>" 
  390     << 
"<path>" << v.
get_path() << 
"</path>" 
  392     << 
"<read>" << v.
is_read() << 
"</read>" 
  393     << 
"<write>" << v.
is_write() << 
"</write>" 
  395     << 
"</DirectIoFile>";
 
0x020D : "FILESYS: Failed to create a directory" . 
The offset is set to its current location plus offset bytes. 
ErrorCode truncate(uint64_t new_length, bool sync=false)
Discard the content of the file after the given offset. 
std::ostream & operator<<(std::ostream &o, const DirectIoFile &v)
0x020F : "FILESYS: Direct I/O operation resulted in non-aligned count of bytes. Filesyste bug...
0x0002 : "GENERAL: Invalid parameter given" . 
uint32_t emulated_seek_latency_cycles_
[Experiments] additional CPU cycles to busy-wait for each seek. 
uint64_t get_alignment() const 
Returns the alignment of the memory block. 
uint64_t count_
Byte count of this slice in memory_. 
0x0202 : "FILESYS: Failed to open a file" . 
ErrorCode write(uint64_t desired_bytes, const foedus::memory::AlignedMemory &buffer)
Sequentially write the given amount of contents from the current position. 
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services). 
AlignedMemory * memory_
The wrapped memory. 
bool close()
Close the file if not yet closed. 
ErrorCode open(bool read, bool write, bool append, bool create)
Tries to open the file for the specified volume. 
void wait_rdtsc_cycles(uint64_t cycles)
Wait until the given CPU cycles elapse. 
uint64_t get_current_offset() const 
~DirectIoFile()
Automatically closes the file if it is opened. 
0x0203 : "FILESYS: Invalid arguments for seek()" . 
ErrorCode read_raw(uint64_t desired_bytes, void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
0x0201 : "FILESYS: Already opened" . 
Analogue of boost::filesystem::path. 
0x0204 : "FILESYS: file seek failed" . 
0x0209 : "FILESYS: Disk write failed." . 
The offset is set to the size of the file plus offset bytes. 
0x020E : "FILESYS: File truncation failed" . 
bool is_opened() const 
Whether the file is already and successfully opened. 
SeekType
Analogue of SEEK_SET/SEEK_CUR/SEEK_END in POSIX. 
ErrorCode sync()
Analogues of POSIX fsync(). 
bool is_odirect_aligned(uint64_t value)
ErrorCode seek(uint64_t offset, SeekType seek_type)
Sets the position of the next byte to be written/extracted from/to the stream. 
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs). 
bool exists(const Path &p)
Returns if the file exists. 
ErrorCode read(uint64_t desired_bytes, foedus::memory::AlignedMemory *buffer)
Sequentially read the given amount of contents from the current position. 
uint32_t emulated_write_kb_cycles_
[Experiments] additional CPU cycles to busy-wait for each 1KB write. 
A slice of foedus::memory::AlignedMemory. 
0x020C : "FILESYS: fsync() failed." . 
uint64_t file_size(const Path &p)
Returns size of the file. 
const char * c_str() const 
ErrorCode write_raw(uint64_t desired_bytes, const void *buffer)
A version that receives a raw pointer that has to be aligned (be careful to use this ver)...
file_descriptor get_descriptor() const 
Set of configurations to emulate slower devices for some experiments. 
0x0207 : "FILESYS: reached end of file before completing reads" . 
std::string os_error()
Thread-safe strerror(errno). 
Represents an I/O stream on one file without filesystem caching. 
0x020A : "FILESYS: wrote more than expected" . 
Represents one memory block aligned to actual OS/hardware pages. 
POSIX open() semantics says -1 is invalid or not-yet-opened. 
0x0208 : "FILESYS: read more than expected" . 
0x020B : "FILESYS: File not opened yet or failed to open." . 
Implements an RDTSC (Real-time time stamp counter) wait to emulate latency on slower devices...
0x0206 : "FILESYS: file buffer is not aligned" . 
bool null_device_
[Experiments] as if we write out to /dev/null. 
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
bool fsync(const Path &path, bool sync_parent_directory=false)
Makes the content and metadata of the file durable all the way up to devices. 
0x0205 : "FILESYS: file buffer is too small" . 
The offset is set to offset bytes. 
const uint64_t kOdirectAlignment
ErrorCode
Enum of error codes defined in error_code.xmacro. 
uint32_t emulated_read_kb_cycles_
[Experiments] additional CPU cycles to busy-wait for each 1KB read. 
bool disable_direct_io_
[Experiments] Whether to disable Direct I/O and use non-direct I/O instead. 
std::string to_string() const