libfoedus-core
FOEDUS Core Library
log_reducer_ref.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <cstring>
23 #include <string>
24 
32 
33 namespace foedus {
34 namespace snapshot {
35 
36 LogReducerRef::LogReducerRef(Engine* engine, uint16_t node) {
37  engine_ = engine;
38  soc::NodeMemoryAnchors* anchors
41  buffers_[0] = anchors->log_reducer_buffers_[0];
42  buffers_[1] = anchors->log_reducer_buffers_[1];
44 }
45 
46 uint16_t LogReducerRef::get_id() const {
47  return control_block_->id_;
48 }
49 std::string LogReducerRef::to_string() const {
50  return std::string("LogReducer-") + std::to_string(get_id());
51 }
52 
55 }
56 
59 }
60 
63 }
64 
66  // the value is in total of two buffers. (<< 20) / 2 == << 19
67  return static_cast<uint64_t>(engine_->get_options().snapshot_.log_reducer_buffer_mb_) << 19;
68 }
69 
70 
71 void* LogReducerRef::get_buffer(uint32_t index) const {
72  soc::NodeMemoryAnchors* anchors
74  return anchors->log_reducer_buffers_[index % 2];
75 }
76 
79  snapshot_manager_memory_->gleaner_.cur_snapshot_;
80 }
81 
83  storage::StorageId storage_id,
84  const char* send_buffer,
85  uint32_t log_count,
86  uint64_t send_buffer_size) const {
87  uint32_t real_log_count = 0;
88  uint64_t cur = 0;
89  const Snapshot& cur_snapshot = get_cur_snapshot();
90  ASSERT_ND(cur_snapshot.id_ != kNullSnapshotId);
91  ASSERT_ND(cur_snapshot.valid_until_epoch_.is_valid());
92  ASSERT_ND(cur_snapshot.max_storage_id_ > 0);
93  ASSERT_ND(cur_snapshot.max_storage_id_ >= storage_id);
94  while (cur < send_buffer_size) {
95  const log::BaseLogType* entry = reinterpret_cast<const log::BaseLogType*>(send_buffer + cur);
96  log::LogCode type = entry->header_.get_type();
97  log::LogCodeKind kind = entry->header_.get_kind();
99  ASSERT_ND(type != log::kLogCodeEpochMarker); // should have been skipped in mapper.
100  ASSERT_ND(entry->header_.log_length_ > 0);
101  ASSERT_ND(entry->header_.log_length_ % 8 == 0);
102  cur += entry->header_.log_length_;
103  ++real_log_count;
104  if (type == log::kLogCodeFiller) {
105  continue;
106  } else {
107  ASSERT_ND(entry->header_.storage_id_ == storage_id);
108  ASSERT_ND(entry->header_.xct_id_.is_valid());
109  ASSERT_ND(entry->header_.xct_id_.get_ordinal() > 0);
110  ASSERT_ND(kind == log::kRecordLogs);
111  Epoch epoch = entry->header_.xct_id_.get_epoch();
112  ASSERT_ND(!cur_snapshot.base_epoch_.is_valid() || epoch > cur_snapshot.base_epoch_);
113  ASSERT_ND(epoch <= cur_snapshot.valid_until_epoch_);
114  }
115  }
116  ASSERT_ND(real_log_count == log_count);
117  ASSERT_ND(cur == send_buffer_size);
118  return true;
119 }
120 
122  storage::StorageId storage_id,
123  const char* send_buffer,
124  uint32_t log_count,
125  uint64_t send_buffer_size,
126  uint32_t shortest_key_length,
127  uint32_t longest_key_length) {
128  DVLOG(1) << "Appending a block of " << send_buffer_size << " bytes (" << log_count
129  << " entries) to " << to_string() << "'s buffer for storage-" << storage_id;
130  ASSERT_ND(verify_log_chunk(storage_id, send_buffer, log_count, send_buffer_size));
131 
132  debugging::RdtscWatch stop_watch;
133 
134  const uint64_t required_size = send_buffer_size + sizeof(FullBlockHeader);
135  uint32_t buffer_index = 0;
136  uint64_t begin_position = 0;
137  while (true) {
138  buffer_index = get_current_buffer_index_atomic();
139  std::atomic<uint64_t>* status_address = control_block_->get_buffer_status_address(buffer_index);
140 
141  // If even the current buffer is marked as no more writers, the reducer is getting behind.
142  // Mappers have to wait, potentially for a long time. So, let's just sleep.
143  ReducerBufferStatus cur_status = control_block_->get_buffer_status_atomic(buffer_index);
144  if (cur_status.components.flags_ & kFlagNoMoreWriters) {
145  VLOG(0) << "Both buffers full in" << to_string() << ". I'll sleep for a while..";
146  while (get_current_buffer_index_atomic() == buffer_index) {
147  std::this_thread::sleep_for(std::chrono::milliseconds(1));
148  }
149  VLOG(0) << "Buffer switched in" << to_string() << " after sleep. Let's resume.";
150  continue;
151  }
152 
153  // the buffer is now full. let's mark this buffer full and
154  // then wake up reducer to do switch.
155  uint64_t buffer_size = get_buffer_size();
156  ASSERT_ND(buffer_size
157  == engine_->get_options().snapshot_.log_reducer_buffer_mb_ * (1ULL << 19));
158  ASSERT_ND(from_buffer_position(cur_status.components.tail_position_) <= buffer_size);
159  ASSERT_ND(reinterpret_cast<char*>(buffers_[0]) + buffer_size
160  == reinterpret_cast<char*>(buffers_[1]));
161  if (from_buffer_position(cur_status.components.tail_position_) + required_size > buffer_size) {
162  ReducerBufferStatus new_status = cur_status;
163  new_status.components.flags_ |= kFlagNoMoreWriters;
164  if (!status_address->compare_exchange_strong(cur_status.word, new_status.word)) {
165  // if CAS fails, someone else might have already done it. retry
166  continue;
167  }
168 
169  // Buffer switch won't be that often, so we simply wait without waking up the reducer.
170  continue;
171  }
172 
173  // okay, "looks like" we can append our log. make it sure with atomic CAS
174  ReducerBufferStatus new_status = cur_status;
175  ++new_status.components.active_writers_;
176  new_status.components.tail_position_ += to_buffer_position(required_size);
177  ASSERT_ND(from_buffer_position(cur_status.components.tail_position_) <= buffer_size);
178  if (!status_address->compare_exchange_strong(cur_status.word, new_status.word)) {
179  // someone else did something. retry
180  continue;
181  }
182 
183  // okay, we atomically reserved the space.
184  begin_position = from_buffer_position(cur_status.components.tail_position_);
185  break;
186  }
187 
188  // now start copying. this might take a few tens of microseconds if it's 1MB and on another
189  // NUMA node.
190  void* buffer = get_buffer(buffer_index);
191  debugging::RdtscWatch copy_watch;
192  char* destination = reinterpret_cast<char*>(buffer) + begin_position;
193  ASSERT_ND(begin_position + sizeof(FullBlockHeader) + send_buffer_size
194  <= engine_->get_options().snapshot_.log_reducer_buffer_mb_ * (1ULL << 19));
195  FullBlockHeader header;
196  header.storage_id_ = storage_id;
197  header.log_count_ = log_count;
198  header.block_length_ = to_buffer_position(required_size);
200  header.shortest_key_length_ = shortest_key_length;
201  header.longest_key_length_ = longest_key_length;
202  std::memcpy(destination, &header, sizeof(FullBlockHeader));
203  std::memcpy(destination + sizeof(FullBlockHeader), send_buffer, send_buffer_size);
204  copy_watch.stop();
205  DVLOG(1) << "memcpy of " << send_buffer_size << " bytes took "
206  << copy_watch.elapsed() << " cycles";
207 
208  // done, let's decrement the active_writers_ to declare we are done.
209  while (true) {
210  std::atomic<uint64_t>* status_address = control_block_->get_buffer_status_address(buffer_index);
211  ReducerBufferStatus cur_status = control_block_->get_buffer_status_atomic(buffer_index);
212  ReducerBufferStatus new_status = cur_status;
213  ASSERT_ND(new_status.components.active_writers_ > 0);
214  --new_status.components.active_writers_;
215  if (!status_address->compare_exchange_strong(cur_status.word, new_status.word)) {
216  // if CAS fails, someone else might have already done it. retry
217  continue;
218  }
219 
220  // okay, decremented. let's exit.
221 
222  // Disabled. for now the reducer does spin. so no need for wakeup
223  // if (new_status.components.active_writers_ == 0
224  // && (new_status.components.flags_ & kFlagNoMoreWriters)) {
225  // // if this was the last writer and the buffer was already closed for new writers,
226  // // the reducer might be waiting for us. let's wake her up
227  // thread_.wakeup();
228  // }
229  break;
230  }
231 
232  stop_watch.stop();
233  DVLOG(1) << "Completed appending a block of " << send_buffer_size << " bytes to " << to_string()
234  << "'s buffer for storage-" << storage_id << " in " << stop_watch.elapsed() << " cycles";
235  ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->is_full_block());
236  ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->storage_id_ == storage_id);
237  ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->block_length_
238  == to_buffer_position(required_size));
239 }
240 
241 } // namespace snapshot
242 } // namespace foedus
LogCodeKind get_kind() const __attribute__((always_inline))
Convenience method to get LogCodeKind.
LogCode
A unique identifier of all log types.
Definition: log_type.hpp:87
std::atomic< uint64_t > * get_buffer_status_address(uint32_t index)
BufferPosition to_buffer_position(uint64_t byte_position)
Definition: snapshot_id.hpp:74
LogCodeKind
Represents the kind of log types.
Definition: log_type.hpp:101
Compactly represents important status informations of a reducer buffer.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
GlobalMemoryAnchors * get_global_memory_anchors()
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
Definition: snapshot.hpp:55
record targetted logs
Definition: log_type.hpp:103
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
Represents a time epoch.
Definition: epoch.hpp:61
uint32_t get_ordinal() const __attribute__((always_inline))
Definition: xct_id.hpp:976
struct foedus::snapshot::ReducerBufferStatus::Components components
Same as GlobalMemoryAnchors except this is for node_memories_.
A bit-wise flag in ReducerBufferStatus's flags_.
NodeMemoryAnchors * get_node_memory_anchors(SocId node)
Declares common log types used in all packages.
bool is_valid() const __attribute__((always_inline))
Definition: xct_id.hpp:973
const EngineOptions & get_options() const
Definition: engine.cpp:39
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
const Snapshot & get_cur_snapshot() const
Base class for log type.
void append_log_chunk(storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size, uint32_t shortest_key_length, uint32_t longest_key_length)
Append the log entries of one storage in the given buffer to this reducer's buffer.
uint64_t from_buffer_position(BufferPosition buffer_position)
Definition: snapshot_id.hpp:78
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
Definition: snapshot.hpp:58
uint64_t elapsed() const __attribute__((always_inline))
Definition: rdtsc_watch.hpp:52
uint32_t get_current_buffer_index_atomic() const
All blocks that have content start with this header.
snapshot::SnapshotOptions snapshot_
LogReducerControlBlock * control_block_
SnapshotId id_
Unique ID of this snapshot.
Definition: snapshot.hpp:43
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
BufferPosition block_length_
Length (in 8-bytes) of this block including the header.
uint32_t magic_word_
This is used to identify the storage block is a dummy (filler) one or a full one. ...
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
snapshot::LogReducerControlBlock * log_reducer_memory_
Tiny control memory for LogReducer in this node.
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
Definition: snapshot.hpp:37
void * log_reducer_buffers_[2]
Actual buffers for LogReducer.
uint16_t log_length_
Byte size of this log entry including this header itself and everything.
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
uint32_t log_reducer_buffer_mb_
The size in MB of a buffer to store log entries in reducer (partition).
storage::Page * log_reducer_root_info_pages_
This is the 'output' of the reducer in this node.
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
bool is_valid() const
Definition: epoch.hpp:96
Epoch base_epoch_
This snapshot was taken on top of previous snapshot that is valid_until this epoch.
Definition: snapshot.hpp:49
void * get_buffer(uint32_t index) const
0 is reserved as a non-existing log type.
Definition: log_type.hpp:89
A RDTSC-based low-overhead stop watch.
Definition: rdtsc_watch.hpp:37
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
storage::StorageId storage_id_
The storage this loggable operation mainly affects.
bool verify_log_chunk(storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size) const
used only in debug mode
0x3001 : foedus::log::FillerLogType .
Definition: log_type.hpp:111
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
uint64_t stop() __attribute__((always_inline))
Take another current time tick.
Definition: rdtsc_watch.hpp:47
uint32_t longest_key_length_
additional statistics for masstree/hash
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
uint32_t get_total_storage_count() const
uint16_t id_
ID of this reducer (or numa node ID).
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
0x3002 : foedus::log::EpochMarkerLogType .
Definition: log_type.hpp:112
uint32_t shortest_key_length_
additional statistics for masstree/hash