20 #include <glog/logging.h>
57 gleaner_resource_(gleaner_resource),
58 new_snapshot_(new_snapshot) {
61 ErrorStack LogGleaner::cancel_reducers_mappers() {
63 VLOG(0) <<
"All mappers/reducers have already exitted. " << *
this;
66 LOG(INFO) <<
"Requesting all mappers/reducers threads to stop.. " << *
this;
68 const uint32_t kTimeoutSleeps = 3000U;
71 std::this_thread::sleep_for(std::chrono::milliseconds(10));
72 if (++count > kTimeoutSleeps) {
79 void LogGleaner::clear_all() {
83 for (uint16_t node = 0; node < node_count; ++node) {
84 LogReducerRef reducer(
engine_, node);
95 ErrorStack LogGleaner::design_partitions() {
103 void LogGleaner::design_partitions_run(
106 ErrorStack* result) {
108 LOG(INFO) <<
"Determining partitions for Storage-" << from <<
" to " << (from + count - 1) <<
".";
111 memory::AlignedMemory work_memory;
116 cache::SnapshotFileSet fileset(
engine_);
117 *result = fileset.initialize();
118 if (result->is_error()) {
119 LOG(ERROR) <<
"fileset.initialize() failed!" << *result;
126 if (!stm->get_storage(
id)->exists()) {
129 storage::Partitioner partitioner(
engine_,
id);
130 storage::Partitioner::DesignPartitionArguments args = { &work_memory, &fileset };
131 ErrorStack ret = partitioner.design_partition(args);
132 if (ret.is_error()) {
133 LOG(ERROR) <<
"Error while determining partitions for storage-" <<
id <<
":" << ret;
139 *result = fileset.uninitialize();
140 if (result->is_error()) {
141 LOG(ERROR) <<
"fileset.uninitialize() failed!" << *result;
144 work_memory.release_block();
145 LOG(INFO) <<
"Determined partitions for Storage-" << from <<
" to " << (from + count - 1) <<
".";
149 LOG(INFO) <<
"Gleaner starts running: snapshot_id=" <<
get_snapshot_id();
152 LOG(INFO) <<
"Gleaner Step 1: Design partitions for all storages...";
160 LOG(INFO) <<
"Gleaner Step 1: Ended in " << watch1.
elapsed_sec() <<
"s";
162 LOG(INFO) <<
"Gleaner Step 2: Run mappers/reducers...";
167 snapshot_manager_memory_->wakeup_snapshot_children();
173 std::this_thread::sleep_for(std::chrono::milliseconds(10));
178 LOG(INFO) <<
"Gleaner Step 2: Ended in " << watch2.
elapsed_sec() <<
"s";
180 LOG(INFO) <<
"Gleaner Step 3: Combine outputs from reducers (root page info)..." << *
this;
183 LOG(ERROR) <<
"Some mapper/reducer got an error. " << *
this;
185 LOG(WARNING) <<
"gleaner stopped without completion. cancelled? " << *
this;
187 LOG(INFO) <<
"All mappers/reducers successfully done. Now on to the final phase." << *
this;
191 LOG(INFO) <<
"Gleaner Step 3: Ended in " << watch3.
elapsed_sec() <<
"s";
193 LOG(INFO) <<
"Gleaner Step 4: Uninitializing...";
196 LOG(INFO) <<
"Gleaner ends";
200 ErrorStack LogGleaner::construct_root_pages() {
201 ASSERT_ND(new_root_page_pointers_.size() == 0);
205 std::vector<const storage::Page*> tmp_array(count,
nullptr);
206 std::vector<uint16_t> cursors;
207 std::vector<uint16_t> buffer_sizes;
208 std::vector<const storage::Page*> buffers;
209 for (uint16_t i = 0; i < count; ++i) {
210 cursors.push_back(0);
212 buffer_sizes.push_back(reducer.get_total_storage_count());
213 buffers.push_back(reducer.get_root_info_pages());
222 SnapshotWriter snapshot_writer(
236 for (uint16_t i = 0; i < count; ++i) {
237 if (cursors[i] == buffer_sizes[i]) {
240 const storage::Page* root_info_page = buffers[i] + cursors[i];
243 if (min_storage_id == 0) {
244 min_storage_id = storage_id;
246 min_storage_id = std::min(min_storage_id, storage_id);
250 if (min_storage_id == 0) {
255 uint16_t input_count = 0;
256 for (uint16_t i = 0; i < count; ++i) {
257 if (cursors[i] == buffer_sizes[i]) {
260 const storage::Page* root_info_page = buffers[i] + cursors[i];
262 if (storage_id == min_storage_id) {
263 tmp_array[input_count] = root_info_page;
269 storage::Composer composer(
engine_, min_storage_id);
271 storage::Composer::ConstructRootArguments args = {
277 &new_root_page_pointer};
280 ASSERT_ND(new_root_page_pointers_.find(min_storage_id) == new_root_page_pointers_.end());
281 new_root_page_pointers_.insert(std::pair<storage::StorageId, storage::SnapshotPagePointer>(
282 min_storage_id, new_root_page_pointer));
285 prev_storage_id = min_storage_id;
286 for (uint16_t i = 0; i < count; ++i) {
287 if (cursors[i] == buffer_sizes[i]) {
290 const storage::Page* root_info_page = buffers[i] + cursors[i];
292 if (storage_id == min_storage_id) {
293 cursors[i] = cursors[i] + 1;
298 snapshot_writer.close();
302 LOG(INFO) <<
"constructed root pages for " << new_root_page_pointers_.size()
303 <<
" storages. in " << stop_watch.elapsed_ms() <<
"ms. "<< *
this;
308 std::stringstream stream;
316 <<
"<completed_mapper_count_>"
320 o <<
"</LogGleaner>";
storage::PartitionerMetadata * partitioner_metadata_
numa_alloc_onnode() and numa_free().
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
GlobalMemoryAnchors * get_global_memory_anchors()
A log-gleaner, which constructs a new set of snapshot files during snapshotting.
Local resource for the log gleaner, which runs only in the master node.
Brings error stacktrace information as return value of functions.
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
Definitions of IDs in this package and a few related constant values.
Holds a set of read-only file objects for snapshot files.
Declares common log types used in all packages.
const EngineOptions & get_options() const
std::atomic< uint16_t > exit_count_
count of mappers/reducers that have exitted.
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
A remote view of LogReducer from all engines.
std::atomic< uint16_t > error_count_
count of mappers/reducers that have exitted with some error.
LogGleanerControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
storage::StorageOptions storage_
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Calls Initializable::uninitialize() automatically when it gets out of scope.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
bool is_all_completed() const
SnapshotId get_snapshot_id() const
Database engine object that holds all resources and provides APIs.
uint64_t stop()
Take another current time tick.
std::string to_string() const
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
memory::AlignedMemory writer_pool_memory_
uint16_t group_count_
Number of ThreadGroup in the engine.
ErrorStack execute()
Main routine of log gleaner.
memory::AlignedMemory writer_intermediate_memory_
bool is_all_exitted() const
thread::ThreadOptions thread_
0x0603 : "SNAPSHT: Snapshot mappers/reducers take too long time to respond to exit request...
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
const ErrorStack kRetOk
Normal return value for no-error case.
double elapsed_sec() const
soc::SocManager * get_soc_manager() const
See SOC and IPC.
A remote view of LogGleaner from all engines.
std::atomic< uint16_t > completed_count_
count of mappers/reducers that have completed processing the current epoch.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
A high-resolution stop watch.
std::ostream & operator<<(std::ostream &o, const SortedBuffer &v)
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
std::atomic< uint16_t > completed_mapper_count_
We also have a separate count for mappers only to know if all mappers are done.