libfoedus-core
FOEDUS Core Library
mapreduce_base_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <chrono>
23 #include <ostream>
24 #include <sstream>
25 #include <string>
26 
27 #include "foedus/assert_nd.hpp"
28 #include "foedus/engine.hpp"
29 #include "foedus/epoch.hpp"
34 
35 namespace foedus {
36 namespace snapshot {
37 MapReduceBase::MapReduceBase(Engine* engine, uint16_t id)
38  : engine_(engine), parent_(engine), id_(id), numa_node_(engine->get_soc_id()), running_(false) {}
39 
41  LOG(INFO) << "Launching thread for " << to_string();
42  thread_ = std::move(std::thread(&MapReduceBase::handle, this));
43 }
44 
46  LOG(INFO) << "Waiting for the completion of thread: " << to_string();
47  if (thread_.joinable()) {
48  thread_.join();
49  }
50  LOG(INFO) << "Observed completion of thread: " << to_string();
51 }
52 
53 
55  if (parent_.is_error()) {
57  }
58  return kErrorCodeOk;
59 }
60 
61 void MapReduceBase::handle() {
62  if (running_) {
63  LOG(FATAL) << "Duplicate launch of " << to_string();
64  }
65  running_ = true;
66  LOG(INFO) << "Started running: " << to_string() << " NUMA node=" << static_cast<int>(numa_node_);
68  ErrorStack result = handle_process(); // calls main logic in derived class
69  if (result.is_error()) {
71  LOG(WARNING) << to_string() << " cancelled";
72  } else {
73  LOG(ERROR) << to_string() << " got an error while processing:" << result;
75  parent_.wakeup();
76  }
77  } else {
78  LOG(INFO) << to_string() << " successfully finished";
79  }
80 
81  // let the gleaner know that I'm done.
82  uint16_t value_after = parent_.increment_completed_count();
83  ASSERT_ND(value_after <= parent_.get_all_count());
84  if (value_after == parent_.get_all_count()) {
85  // I was the last one to go into sleep, this means everything is fully processed.
86  // let gleaner knows about it.
88  LOG(INFO) << to_string() << " was the last one to finish, waking up gleaner.. ";
89  parent_.wakeup();
90  }
91 
93  LOG(INFO) << "Stopped running: " << to_string();
94  running_ = false;
95 }
96 
97 } // namespace snapshot
98 } // namespace foedus
ErrorCode check_cancelled() const
Derived class's handle_process() should occasionally call this to exit if it's cancelled.
void launch_thread()
Start executing.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
std::atomic< bool > running_
only for sanity check
Pin the current thread to the given NUMA node in this object's scope.
ErrorCode get_error_code() const
Return the integer error code.
0 means no-error.
Definition: error_code.hpp:87
0x0602 : "SNAPSHT: (internal error code) Snapshot task cancelled." .
Definition: error_code.hpp:162
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
virtual ErrorStack handle_process()=0
Implements the specific logics in derived class.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
virtual std::string to_string() const =0
Expects "LogReducer-x", "LogMapper-y" etc.
bool is_error() const
Returns if this return code is not kErrorCodeOk.