libfoedus-core
FOEDUS Core Library
log_gleaner_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <chrono>
24 #include <map>
25 #include <ostream>
26 #include <sstream>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "foedus/engine.hpp"
48 
49 namespace foedus {
50 namespace snapshot {
51 
53  Engine* engine,
54  LogGleanerResource* gleaner_resource,
55  const Snapshot& new_snapshot)
56  : LogGleanerRef(engine),
57  gleaner_resource_(gleaner_resource),
58  new_snapshot_(new_snapshot) {
59 }
60 
61 ErrorStack LogGleaner::cancel_reducers_mappers() {
62  if (is_all_exitted()) {
63  VLOG(0) << "All mappers/reducers have already exitted. " << *this;
64  return kRetOk;
65  }
66  LOG(INFO) << "Requesting all mappers/reducers threads to stop.. " << *this;
67  control_block_->cancelled_ = true;
68  const uint32_t kTimeoutSleeps = 3000U;
69  uint32_t count = 0;
70  while (!is_all_exitted() && !is_error()) {
71  std::this_thread::sleep_for(std::chrono::milliseconds(10));
72  if (++count > kTimeoutSleeps) {
74  }
75  }
76  return kRetOk;
77 }
78 
79 void LogGleaner::clear_all() {
80  control_block_->clear_counts();
81  control_block_->cur_snapshot_ = new_snapshot_;
82  uint16_t node_count = engine_->get_options().thread_.group_count_;
83  for (uint16_t node = 0; node < node_count; ++node) {
84  LogReducerRef reducer(engine_, node);
85  reducer.clear();
86  }
88  ASSERT_ND(partitioner_metadata_[0].data_size_
90  for (storage::StorageId i = 1; i <= new_snapshot_.max_storage_id_; ++i) {
92  }
93 }
94 
95 ErrorStack LogGleaner::design_partitions() {
96  // so far single threaded to debug easily.
97  // but, let's prepare for parallelization so that we can switch later.
98  ErrorStack result;
99  design_partitions_run(1U, new_snapshot_.max_storage_id_, &result);
100  return result;
101 }
102 
103 void LogGleaner::design_partitions_run(
104  storage::StorageId from,
105  storage::StorageId count,
106  ErrorStack* result) {
107  *result = kRetOk;
108  LOG(INFO) << "Determining partitions for Storage-" << from << " to " << (from + count - 1) << ".";
109 
110  // working memory while designing. we auto-extend if the partitioner requests so.
111  memory::AlignedMemory work_memory;
112  work_memory.alloc(1U << 21, 1U << 12, memory::AlignedMemory::kNumaAllocOnnode, 0);
113 
114  // To read from snapshot pages while designing. This must be thread-private. So we instantiate
115  // for each design_partitions_run()
116  cache::SnapshotFileSet fileset(engine_);
117  *result = fileset.initialize();
118  if (result->is_error()) {
119  LOG(ERROR) << "fileset.initialize() failed!" << *result;
120  return;
121  }
122  UninitializeGuard fileset_guard(&fileset, UninitializeGuard::kWarnIfUninitializeError);
123 
124  storage::StorageManager* stm = engine_->get_storage_manager();
125  for (storage::StorageId id = from; id < from + count; ++id) {
126  if (!stm->get_storage(id)->exists()) {
127  continue;
128  }
129  storage::Partitioner partitioner(engine_, id);
130  storage::Partitioner::DesignPartitionArguments args = { &work_memory, &fileset };
131  ErrorStack ret = partitioner.design_partition(args);
132  if (ret.is_error()) {
133  LOG(ERROR) << "Error while determining partitions for storage-" << id << ":" << ret;
134  *result = ret;
135  break;
136  }
137  }
138 
139  *result = fileset.uninitialize();
140  if (result->is_error()) {
141  LOG(ERROR) << "fileset.uninitialize() failed!" << *result;
142  return;
143  }
144  work_memory.release_block();
145  LOG(INFO) << "Determined partitions for Storage-" << from << " to " << (from + count - 1) << ".";
146 }
147 
149  LOG(INFO) << "Gleaner starts running: snapshot_id=" << get_snapshot_id();
150  clear_all();
151 
152  LOG(INFO) << "Gleaner Step 1: Design partitions for all storages...";
153  debugging::StopWatch watch1;
154  // Another approach is to delay this step until some mapper really needs it so that we can
155  // skip partition-designing for storages that weren't modified.
156  // However, it requires synchronization in mapper/reducer and this step is anyway fast enough.
157  // So, we so far simply design partitions for all of them.
158  CHECK_ERROR(design_partitions());
159  watch1.stop();
160  LOG(INFO) << "Gleaner Step 1: Ended in " << watch1.elapsed_sec() << "s";
161 
162  LOG(INFO) << "Gleaner Step 2: Run mappers/reducers...";
163  debugging::StopWatch watch2;
164  // Request each node's snapshot manager to launch mappers/reducers threads
165  control_block_->gleaning_ = true;
167  snapshot_manager_memory_->wakeup_snapshot_children();
168 
169  // then, wait until all mappers/reducers are done
171  // snapshot is an infrequent operation, doesn't have to wake up immediately.
172  // just sleep for a while
173  std::this_thread::sleep_for(std::chrono::milliseconds(10));
174  }
175 
176  control_block_->gleaning_ = false;
177  watch2.stop();
178  LOG(INFO) << "Gleaner Step 2: Ended in " << watch2.elapsed_sec() << "s";
179 
180  LOG(INFO) << "Gleaner Step 3: Combine outputs from reducers (root page info)..." << *this;
181  debugging::StopWatch watch3;
182  if (is_error()) {
183  LOG(ERROR) << "Some mapper/reducer got an error. " << *this;
184  } else if (!is_all_completed()) {
185  LOG(WARNING) << "gleaner stopped without completion. cancelled? " << *this;
186  } else {
187  LOG(INFO) << "All mappers/reducers successfully done. Now on to the final phase." << *this;
188  CHECK_ERROR(construct_root_pages());
189  }
190  watch3.stop();
191  LOG(INFO) << "Gleaner Step 3: Ended in " << watch3.elapsed_sec() << "s";
192 
193  LOG(INFO) << "Gleaner Step 4: Uninitializing...";
194  CHECK_ERROR(cancel_reducers_mappers());
196  LOG(INFO) << "Gleaner ends";
197  return kRetOk;
198 }
199 
200 ErrorStack LogGleaner::construct_root_pages() {
201  ASSERT_ND(new_root_page_pointers_.size() == 0);
202  debugging::StopWatch stop_watch;
203 
204  const uint16_t count = control_block_->reducers_count_;
205  std::vector<const storage::Page*> tmp_array(count, nullptr);
206  std::vector<uint16_t> cursors;
207  std::vector<uint16_t> buffer_sizes;
208  std::vector<const storage::Page*> buffers;
209  for (uint16_t i = 0; i < count; ++i) {
210  cursors.push_back(0);
211  LogReducerRef reducer(engine_, i);
212  buffer_sizes.push_back(reducer.get_total_storage_count());
213  buffers.push_back(reducer.get_root_info_pages());
214  }
215 
216  // composers read snapshot files.
218  CHECK_ERROR(fileset.initialize());
220 
221  // composers need SnapshotWriter to write out to.
222  SnapshotWriter snapshot_writer(
223  engine_,
224  0,
225  get_snapshot_id(),
226  &gleaner_resource_->writer_pool_memory_,
227  &gleaner_resource_->writer_intermediate_memory_,
228  true); // we append to the node-0 snapshot file.
229  CHECK_ERROR(snapshot_writer.open());
230 
231  storage::StorageId prev_storage_id = 0;
232  // each reducer's root-info-page must be sorted by storage_id, so we do kind of merge-sort here.
233  while (true) {
234  // determine which storage to process by finding the smallest storage_id
235  storage::StorageId min_storage_id = 0;
236  for (uint16_t i = 0; i < count; ++i) {
237  if (cursors[i] == buffer_sizes[i]) {
238  continue;
239  }
240  const storage::Page* root_info_page = buffers[i] + cursors[i];
241  storage::StorageId storage_id = root_info_page->get_header().storage_id_;
242  ASSERT_ND(storage_id > prev_storage_id);
243  if (min_storage_id == 0) {
244  min_storage_id = storage_id;
245  } else {
246  min_storage_id = std::min(min_storage_id, storage_id);
247  }
248  }
249 
250  if (min_storage_id == 0) {
251  break; // all reducers' all root info pages processed
252  }
253 
254  // fill tmp_array
255  uint16_t input_count = 0;
256  for (uint16_t i = 0; i < count; ++i) {
257  if (cursors[i] == buffer_sizes[i]) {
258  continue;
259  }
260  const storage::Page* root_info_page = buffers[i] + cursors[i];
261  storage::StorageId storage_id = root_info_page->get_header().storage_id_;
262  if (storage_id == min_storage_id) {
263  tmp_array[input_count] = root_info_page;
264  ++input_count;
265  }
266  }
267  ASSERT_ND(input_count > 0);
268 
269  storage::Composer composer(engine_, min_storage_id);
270  storage::SnapshotPagePointer new_root_page_pointer;
271  storage::Composer::ConstructRootArguments args = {
272  &snapshot_writer,
273  &fileset,
274  &tmp_array[0],
275  input_count,
276  gleaner_resource_,
277  &new_root_page_pointer};
278  CHECK_ERROR(composer.construct_root(args));
279  ASSERT_ND(new_root_page_pointer > 0);
280  ASSERT_ND(new_root_page_pointers_.find(min_storage_id) == new_root_page_pointers_.end());
281  new_root_page_pointers_.insert(std::pair<storage::StorageId, storage::SnapshotPagePointer>(
282  min_storage_id, new_root_page_pointer));
283 
284  // done for this storage. advance cursors
285  prev_storage_id = min_storage_id;
286  for (uint16_t i = 0; i < count; ++i) {
287  if (cursors[i] == buffer_sizes[i]) {
288  continue;
289  }
290  const storage::Page* root_info_page = buffers[i] + cursors[i];
291  storage::StorageId storage_id = root_info_page->get_header().storage_id_;
292  if (storage_id == min_storage_id) {
293  cursors[i] = cursors[i] + 1;
294  }
295  }
296  }
297 
298  snapshot_writer.close();
299  CHECK_ERROR(fileset.uninitialize());
300 
301  stop_watch.stop();
302  LOG(INFO) << "constructed root pages for " << new_root_page_pointers_.size()
303  << " storages. in " << stop_watch.elapsed_ms() << "ms. "<< *this;
304  return kRetOk;
305 }
306 
307 std::string LogGleaner::to_string() const {
308  std::stringstream stream;
309  stream << *this;
310  return stream.str();
311 }
312 std::ostream& operator<<(std::ostream& o, const LogGleaner& v) {
313  o << "<LogGleaner>"
314  << v.new_snapshot_
315  << "<completed_count_>" << v.control_block_->completed_count_ << "</completed_count_>"
316  << "<completed_mapper_count_>"
317  << v.control_block_->completed_mapper_count_ << "</completed_mapper_count_>"
318  << "<error_count_>" << v.control_block_->error_count_ << "</error_count_>"
319  << "<exit_count_>" << v.control_block_->exit_count_ << "</exit_count_>";
320  o << "</LogGleaner>";
321  return o;
322 }
323 
324 
325 } // namespace snapshot
326 } // namespace foedus
storage::PartitionerMetadata * partitioner_metadata_
numa_alloc_onnode() and numa_free().
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
GlobalMemoryAnchors * get_global_memory_anchors()
A log-gleaner, which constructs a new set of snapshot files during snapshotting.
Local resource for the log gleaner, which runs only in the master node.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definition: attachable.hpp:107
Definitions of IDs in this package and a few related constant values.
Holds a set of read-only file objects for snapshot files.
Declares common log types used in all packages.
const EngineOptions & get_options() const
Definition: engine.cpp:39
std::atomic< uint16_t > exit_count_
count of mappers/reducers that have exitted.
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
Definition: snapshot.hpp:58
A remote view of LogReducer from all engines.
std::atomic< uint16_t > error_count_
count of mappers/reducers that have exitted with some error.
LogGleanerControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
Definition: attachable.hpp:111
storage::StorageOptions storage_
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
Calls Initializable::uninitialize() automatically when it gets out of scope.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
SnapshotId get_snapshot_id() const
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
std::string to_string() const
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
Definition: snapshot.hpp:37
uint16_t group_count_
Number of ThreadGroup in the engine.
uint32_t data_offset_
Relative offset from the beginning of partitioner data block that points to variable-sized partitione...
ErrorStack execute()
Main routine of log gleaner.
thread::ThreadOptions thread_
0x0603 : "SNAPSHT: Snapshot mappers/reducers take too long time to respond to exit request...
Definition: error_code.hpp:163
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
double elapsed_sec() const
Definition: stop_watch.hpp:51
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
A remote view of LogGleaner from all engines.
std::atomic< uint16_t > completed_count_
count of mappers/reducers that have completed processing the current epoch.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
Definition: log_buffer.cpp:32
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
std::atomic< uint16_t > completed_mapper_count_
We also have a separate count for mappers only to know if all mappers are done.