libfoedus-core
FOEDUS Core Library
foedus::snapshot::LogReducerRef Class Reference

A remote view of LogReducer from all engines. More...

Detailed Description

A remote view of LogReducer from all engines.

Definition at line 40 of file log_reducer_ref.hpp.

#include <log_reducer_ref.hpp>

Collaboration diagram for foedus::snapshot::LogReducerRef:

Public Member Functions

 LogReducerRef ()
 
 LogReducerRef (Engine *engine, uint16_t node)
 
uint16_t get_id () const
 
std::string to_string () const
 
void clear ()
 
uint32_t get_total_storage_count () const
 
storage::Pageget_root_info_pages ()
 
void append_log_chunk (storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size, uint32_t shortest_key_length, uint32_t longest_key_length)
 Append the log entries of one storage in the given buffer to this reducer's buffer. More...
 

Protected Member Functions

const Snapshotget_cur_snapshot () const
 
uint64_t get_buffer_size () const
 
uint32_t get_current_buffer_index_atomic () const
 
void * get_buffer (uint32_t index) const
 
bool verify_log_chunk (storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size) const
 used only in debug mode More...
 

Protected Attributes

Engineengine_
 
LogReducerControlBlockcontrol_block_
 
void * buffers_ [2]
 
storage::Pageroot_info_pages_
 

Friends

std::ostream & operator<< (std::ostream &o, const LogReducerRef &v)
 

Constructor & Destructor Documentation

foedus::snapshot::LogReducerRef::LogReducerRef ( )
inline

Definition at line 42 of file log_reducer_ref.hpp.

References buffers_, control_block_, CXX11_NULLPTR, engine_, and root_info_pages_.

42  {
48  }
#define CXX11_NULLPTR
Used in public headers in place of "nullptr" of C++11.
Definition: cxx11.hpp:132
LogReducerControlBlock * control_block_
foedus::snapshot::LogReducerRef::LogReducerRef ( Engine engine,
uint16_t  node 
)

Definition at line 36 of file log_reducer_ref.cpp.

References buffers_, control_block_, engine_, foedus::soc::SharedMemoryRepo::get_node_memory_anchors(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), foedus::soc::NodeMemoryAnchors::log_reducer_buffers_, foedus::soc::NodeMemoryAnchors::log_reducer_memory_, foedus::soc::NodeMemoryAnchors::log_reducer_root_info_pages_, and root_info_pages_.

36  {
37  engine_ = engine;
38  soc::NodeMemoryAnchors* anchors
40  control_block_ = anchors->log_reducer_memory_;
41  buffers_[0] = anchors->log_reducer_buffers_[0];
42  buffers_[1] = anchors->log_reducer_buffers_[1];
43  root_info_pages_ = anchors->log_reducer_root_info_pages_;
44 }
NodeMemoryAnchors * get_node_memory_anchors(SocId node)
LogReducerControlBlock * control_block_
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

Member Function Documentation

void foedus::snapshot::LogReducerRef::append_log_chunk ( storage::StorageId  storage_id,
const char *  send_buffer,
uint32_t  log_count,
uint64_t  send_buffer_size,
uint32_t  shortest_key_length,
uint32_t  longest_key_length 
)

Append the log entries of one storage in the given buffer to this reducer's buffer.

Parameters
[in]storage_idall log entries are of this storage
[in]send_buffercontains log entries to copy
[in]log_countnumber of log entries to copy
[in]send_buffer_sizebyte count to copy
[in]shortest_key_length[masstree/hash] shortest key length in the log entries
[in]longest_key_length[masstree/hash] longest key length in the log entries

This is the interface via which mappers send log entries to reducers. Internally, this atomically changes the status of the current reducer buffer to reserve a contiguous space and then copy without blocking other mappers. If this methods hits a situation where the current buffer becomes full, this methods wakes up the reducer and lets it switch the current buffer. All log entries are contiguously copied. One block doesn't span two buffers.

Definition at line 121 of file log_reducer_ref.cpp.

References foedus::snapshot::ReducerBufferStatus::Components::active_writers_, ASSERT_ND, foedus::snapshot::BlockHeaderBase::block_length_, buffers_, foedus::snapshot::ReducerBufferStatus::components, control_block_, foedus::debugging::RdtscWatch::elapsed(), engine_, foedus::snapshot::ReducerBufferStatus::Components::flags_, foedus::snapshot::from_buffer_position(), get_buffer(), get_buffer_size(), foedus::snapshot::LogReducerControlBlock::get_buffer_status_address(), foedus::snapshot::LogReducerControlBlock::get_buffer_status_atomic(), get_current_buffer_index_atomic(), foedus::Engine::get_options(), foedus::snapshot::kFlagNoMoreWriters, foedus::snapshot::BlockHeaderBase::kFullBlockHeaderMagicWord, foedus::snapshot::FullBlockHeader::log_count_, foedus::snapshot::SnapshotOptions::log_reducer_buffer_mb_, foedus::snapshot::FullBlockHeader::longest_key_length_, foedus::snapshot::BlockHeaderBase::magic_word_, foedus::snapshot::FullBlockHeader::shortest_key_length_, foedus::EngineOptions::snapshot_, foedus::debugging::RdtscWatch::stop(), foedus::snapshot::FullBlockHeader::storage_id_, foedus::snapshot::ReducerBufferStatus::Components::tail_position_, foedus::snapshot::to_buffer_position(), to_string(), verify_log_chunk(), and foedus::snapshot::ReducerBufferStatus::word.

127  {
128  DVLOG(1) << "Appending a block of " << send_buffer_size << " bytes (" << log_count
129  << " entries) to " << to_string() << "'s buffer for storage-" << storage_id;
130  ASSERT_ND(verify_log_chunk(storage_id, send_buffer, log_count, send_buffer_size));
131 
132  debugging::RdtscWatch stop_watch;
133 
134  const uint64_t required_size = send_buffer_size + sizeof(FullBlockHeader);
135  uint32_t buffer_index = 0;
136  uint64_t begin_position = 0;
137  while (true) {
138  buffer_index = get_current_buffer_index_atomic();
139  std::atomic<uint64_t>* status_address = control_block_->get_buffer_status_address(buffer_index);
140 
141  // If even the current buffer is marked as no more writers, the reducer is getting behind.
142  // Mappers have to wait, potentially for a long time. So, let's just sleep.
143  ReducerBufferStatus cur_status = control_block_->get_buffer_status_atomic(buffer_index);
144  if (cur_status.components.flags_ & kFlagNoMoreWriters) {
145  VLOG(0) << "Both buffers full in" << to_string() << ". I'll sleep for a while..";
146  while (get_current_buffer_index_atomic() == buffer_index) {
147  std::this_thread::sleep_for(std::chrono::milliseconds(1));
148  }
149  VLOG(0) << "Buffer switched in" << to_string() << " after sleep. Let's resume.";
150  continue;
151  }
152 
153  // the buffer is now full. let's mark this buffer full and
154  // then wake up reducer to do switch.
155  uint64_t buffer_size = get_buffer_size();
156  ASSERT_ND(buffer_size
157  == engine_->get_options().snapshot_.log_reducer_buffer_mb_ * (1ULL << 19));
158  ASSERT_ND(from_buffer_position(cur_status.components.tail_position_) <= buffer_size);
159  ASSERT_ND(reinterpret_cast<char*>(buffers_[0]) + buffer_size
160  == reinterpret_cast<char*>(buffers_[1]));
161  if (from_buffer_position(cur_status.components.tail_position_) + required_size > buffer_size) {
162  ReducerBufferStatus new_status = cur_status;
163  new_status.components.flags_ |= kFlagNoMoreWriters;
164  if (!status_address->compare_exchange_strong(cur_status.word, new_status.word)) {
165  // if CAS fails, someone else might have already done it. retry
166  continue;
167  }
168 
169  // Buffer switch won't be that often, so we simply wait without waking up the reducer.
170  continue;
171  }
172 
173  // okay, "looks like" we can append our log. make it sure with atomic CAS
174  ReducerBufferStatus new_status = cur_status;
175  ++new_status.components.active_writers_;
176  new_status.components.tail_position_ += to_buffer_position(required_size);
177  ASSERT_ND(from_buffer_position(cur_status.components.tail_position_) <= buffer_size);
178  if (!status_address->compare_exchange_strong(cur_status.word, new_status.word)) {
179  // someone else did something. retry
180  continue;
181  }
182 
183  // okay, we atomically reserved the space.
184  begin_position = from_buffer_position(cur_status.components.tail_position_);
185  break;
186  }
187 
188  // now start copying. this might take a few tens of microseconds if it's 1MB and on another
189  // NUMA node.
190  void* buffer = get_buffer(buffer_index);
191  debugging::RdtscWatch copy_watch;
192  char* destination = reinterpret_cast<char*>(buffer) + begin_position;
193  ASSERT_ND(begin_position + sizeof(FullBlockHeader) + send_buffer_size
194  <= engine_->get_options().snapshot_.log_reducer_buffer_mb_ * (1ULL << 19));
195  FullBlockHeader header;
196  header.storage_id_ = storage_id;
197  header.log_count_ = log_count;
198  header.block_length_ = to_buffer_position(required_size);
199  header.magic_word_ = BlockHeaderBase::kFullBlockHeaderMagicWord;
200  header.shortest_key_length_ = shortest_key_length;
201  header.longest_key_length_ = longest_key_length;
202  std::memcpy(destination, &header, sizeof(FullBlockHeader));
203  std::memcpy(destination + sizeof(FullBlockHeader), send_buffer, send_buffer_size);
204  copy_watch.stop();
205  DVLOG(1) << "memcpy of " << send_buffer_size << " bytes took "
206  << copy_watch.elapsed() << " cycles";
207 
208  // done, let's decrement the active_writers_ to declare we are done.
209  while (true) {
210  std::atomic<uint64_t>* status_address = control_block_->get_buffer_status_address(buffer_index);
211  ReducerBufferStatus cur_status = control_block_->get_buffer_status_atomic(buffer_index);
212  ReducerBufferStatus new_status = cur_status;
213  ASSERT_ND(new_status.components.active_writers_ > 0);
214  --new_status.components.active_writers_;
215  if (!status_address->compare_exchange_strong(cur_status.word, new_status.word)) {
216  // if CAS fails, someone else might have already done it. retry
217  continue;
218  }
219 
220  // okay, decremented. let's exit.
221 
222  // Disabled. for now the reducer does spin. so no need for wakeup
223  // if (new_status.components.active_writers_ == 0
224  // && (new_status.components.flags_ & kFlagNoMoreWriters)) {
225  // // if this was the last writer and the buffer was already closed for new writers,
226  // // the reducer might be waiting for us. let's wake her up
227  // thread_.wakeup();
228  // }
229  break;
230  }
231 
232  stop_watch.stop();
233  DVLOG(1) << "Completed appending a block of " << send_buffer_size << " bytes to " << to_string()
234  << "'s buffer for storage-" << storage_id << " in " << stop_watch.elapsed() << " cycles";
235  ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->is_full_block());
236  ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->storage_id_ == storage_id);
237  ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->block_length_
238  == to_buffer_position(required_size));
239 }
std::atomic< uint64_t > * get_buffer_status_address(uint32_t index)
BufferPosition to_buffer_position(uint64_t byte_position)
Definition: snapshot_id.hpp:74
A bit-wise flag in ReducerBufferStatus's flags_.
const EngineOptions & get_options() const
Definition: engine.cpp:39
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
uint64_t from_buffer_position(BufferPosition buffer_position)
Definition: snapshot_id.hpp:78
uint32_t get_current_buffer_index_atomic() const
snapshot::SnapshotOptions snapshot_
LogReducerControlBlock * control_block_
uint32_t log_reducer_buffer_mb_
The size in MB of a buffer to store log entries in reducer (partition).
void * get_buffer(uint32_t index) const
bool verify_log_chunk(storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size) const
used only in debug mode
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

void foedus::snapshot::LogReducerRef::clear ( )

Definition at line 53 of file log_reducer_ref.cpp.

References foedus::snapshot::LogReducerControlBlock::clear(), and control_block_.

Here is the call graph for this function:

void * foedus::snapshot::LogReducerRef::get_buffer ( uint32_t  index) const
protected

Definition at line 71 of file log_reducer_ref.cpp.

References engine_, get_id(), foedus::soc::SharedMemoryRepo::get_node_memory_anchors(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), and foedus::soc::NodeMemoryAnchors::log_reducer_buffers_.

Referenced by append_log_chunk().

71  {
72  soc::NodeMemoryAnchors* anchors
74  return anchors->log_reducer_buffers_[index % 2];
75 }
NodeMemoryAnchors * get_node_memory_anchors(SocId node)
void * log_reducer_buffers_[2]
Actual buffers for LogReducer.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

Here is the caller graph for this function:

uint64_t foedus::snapshot::LogReducerRef::get_buffer_size ( ) const
protected

Definition at line 65 of file log_reducer_ref.cpp.

References engine_, foedus::Engine::get_options(), foedus::snapshot::SnapshotOptions::log_reducer_buffer_mb_, and foedus::EngineOptions::snapshot_.

Referenced by append_log_chunk().

65  {
66  // the value is in total of two buffers. (<< 20) / 2 == << 19
67  return static_cast<uint64_t>(engine_->get_options().snapshot_.log_reducer_buffer_mb_) << 19;
68 }
const EngineOptions & get_options() const
Definition: engine.cpp:39
snapshot::SnapshotOptions snapshot_
uint32_t log_reducer_buffer_mb_
The size in MB of a buffer to store log entries in reducer (partition).

Here is the call graph for this function:

Here is the caller graph for this function:

const Snapshot & foedus::snapshot::LogReducerRef::get_cur_snapshot ( ) const
protected

Definition at line 77 of file log_reducer_ref.cpp.

References engine_, foedus::soc::SharedMemoryRepo::get_global_memory_anchors(), foedus::soc::SocManager::get_shared_memory_repo(), and foedus::Engine::get_soc_manager().

Referenced by verify_log_chunk().

77  {
79  snapshot_manager_memory_->gleaner_.cur_snapshot_;
80 }
GlobalMemoryAnchors * get_global_memory_anchors()
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

Here is the caller graph for this function:

uint32_t foedus::snapshot::LogReducerRef::get_current_buffer_index_atomic ( ) const
protected

Definition at line 61 of file log_reducer_ref.cpp.

References control_block_, and foedus::snapshot::LogReducerControlBlock::current_buffer_.

Referenced by append_log_chunk().

61  {
63 }
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
LogReducerControlBlock * control_block_

Here is the caller graph for this function:

uint16_t foedus::snapshot::LogReducerRef::get_id ( ) const

Definition at line 46 of file log_reducer_ref.cpp.

References control_block_, and foedus::snapshot::LogReducerControlBlock::id_.

Referenced by get_buffer(), and to_string().

46  {
47  return control_block_->id_;
48 }
LogReducerControlBlock * control_block_
uint16_t id_
ID of this reducer (or numa node ID).

Here is the caller graph for this function:

storage::Page* foedus::snapshot::LogReducerRef::get_root_info_pages ( )
inline

Definition at line 55 of file log_reducer_ref.hpp.

References root_info_pages_.

55 { return root_info_pages_; }
uint32_t foedus::snapshot::LogReducerRef::get_total_storage_count ( ) const

Definition at line 57 of file log_reducer_ref.cpp.

References control_block_, and foedus::snapshot::LogReducerControlBlock::total_storage_count_.

57  {
59 }
LogReducerControlBlock * control_block_
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
std::string foedus::snapshot::LogReducerRef::to_string ( ) const

Definition at line 49 of file log_reducer_ref.cpp.

References get_id().

Referenced by append_log_chunk().

49  {
50  return std::string("LogReducer-") + std::to_string(get_id());
51 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::snapshot::LogReducerRef::verify_log_chunk ( storage::StorageId  storage_id,
const char *  send_buffer,
uint32_t  log_count,
uint64_t  send_buffer_size 
) const
protected

used only in debug mode

Definition at line 82 of file log_reducer_ref.cpp.

References ASSERT_ND, foedus::snapshot::Snapshot::base_epoch_, get_cur_snapshot(), foedus::xct::XctId::get_epoch(), foedus::log::LogHeader::get_kind(), foedus::xct::XctId::get_ordinal(), foedus::log::LogHeader::get_type(), foedus::log::BaseLogType::header_, foedus::snapshot::Snapshot::id_, foedus::Epoch::is_valid(), foedus::xct::XctId::is_valid(), foedus::log::kLogCodeEpochMarker, foedus::log::kLogCodeFiller, foedus::log::kLogCodeInvalid, foedus::snapshot::kNullSnapshotId, foedus::log::kRecordLogs, foedus::log::LogHeader::log_length_, foedus::snapshot::Snapshot::max_storage_id_, foedus::log::LogHeader::storage_id_, foedus::snapshot::Snapshot::valid_until_epoch_, and foedus::log::LogHeader::xct_id_.

Referenced by append_log_chunk().

86  {
87  uint32_t real_log_count = 0;
88  uint64_t cur = 0;
89  const Snapshot& cur_snapshot = get_cur_snapshot();
90  ASSERT_ND(cur_snapshot.id_ != kNullSnapshotId);
91  ASSERT_ND(cur_snapshot.valid_until_epoch_.is_valid());
92  ASSERT_ND(cur_snapshot.max_storage_id_ > 0);
93  ASSERT_ND(cur_snapshot.max_storage_id_ >= storage_id);
94  while (cur < send_buffer_size) {
95  const log::BaseLogType* entry = reinterpret_cast<const log::BaseLogType*>(send_buffer + cur);
96  log::LogCode type = entry->header_.get_type();
97  log::LogCodeKind kind = entry->header_.get_kind();
99  ASSERT_ND(type != log::kLogCodeEpochMarker); // should have been skipped in mapper.
100  ASSERT_ND(entry->header_.log_length_ > 0);
101  ASSERT_ND(entry->header_.log_length_ % 8 == 0);
102  cur += entry->header_.log_length_;
103  ++real_log_count;
104  if (type == log::kLogCodeFiller) {
105  continue;
106  } else {
107  ASSERT_ND(entry->header_.storage_id_ == storage_id);
108  ASSERT_ND(entry->header_.xct_id_.is_valid());
109  ASSERT_ND(entry->header_.xct_id_.get_ordinal() > 0);
110  ASSERT_ND(kind == log::kRecordLogs);
111  Epoch epoch = entry->header_.xct_id_.get_epoch();
112  ASSERT_ND(!cur_snapshot.base_epoch_.is_valid() || epoch > cur_snapshot.base_epoch_);
113  ASSERT_ND(epoch <= cur_snapshot.valid_until_epoch_);
114  }
115  }
116  ASSERT_ND(real_log_count == log_count);
117  ASSERT_ND(cur == send_buffer_size);
118  return true;
119 }
LogCode
A unique identifier of all log types.
Definition: log_type.hpp:87
LogCodeKind
Represents the kind of log types.
Definition: log_type.hpp:101
record targetted logs
Definition: log_type.hpp:103
const Snapshot & get_cur_snapshot() const
const SnapshotId kNullSnapshotId
Definition: snapshot_id.hpp:45
0 is reserved as a non-existing log type.
Definition: log_type.hpp:89
0x3001 : foedus::log::FillerLogType .
Definition: log_type.hpp:111
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
0x3002 : foedus::log::EpochMarkerLogType .
Definition: log_type.hpp:112

Here is the call graph for this function:

Here is the caller graph for this function:

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  o,
const LogReducerRef v 
)
friend

Member Data Documentation

void* foedus::snapshot::LogReducerRef::buffers_[2]
protected

Definition at line 96 of file log_reducer_ref.hpp.

Referenced by append_log_chunk(), and LogReducerRef().

LogReducerControlBlock* foedus::snapshot::LogReducerRef::control_block_
protected
Engine* foedus::snapshot::LogReducerRef::engine_
protected
storage::Page* foedus::snapshot::LogReducerRef::root_info_pages_
protected

Definition at line 97 of file log_reducer_ref.hpp.

Referenced by get_root_info_pages(), and LogReducerRef().


The documentation for this class was generated from the following files: