libfoedus-core
FOEDUS Core Library
hash_partitioner_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <ostream>
24 #include <thread>
25 #include <vector>
26 
27 #include "foedus/engine.hpp"
38 
39 namespace foedus {
40 namespace storage {
41 namespace hash {
42 
44  : engine_(parent->get_engine()),
45  id_(parent->get_storage_id()),
46  metadata_(PartitionerMetadata::get_metadata(engine_, id_)) {
47  ASSERT_ND(metadata_->mutex_.is_initialized());
48  if (metadata_->valid_) {
49  data_ = reinterpret_cast<HashPartitionerData*>(metadata_->locate_data(engine_));
50  } else {
51  data_ = nullptr;
52  }
53 }
54 
56  ASSERT_ND(data_);
57  return data_->bin_owners_;
58 }
60  ASSERT_ND(data_);
61  return data_->partitionable_;
62 }
63 
64 void design_partition_thread(HashPartitioner* partitioner, uint16_t task) {
65  partitioner->design_partition_task(task);
66 }
67 
69  const Partitioner::DesignPartitionArguments& /*args*/) {
70  ASSERT_ND(metadata_->mutex_.is_initialized());
71  ASSERT_ND(data_ == nullptr);
72  HashStorage storage(engine_, id_);
73  ASSERT_ND(storage.exists());
74  HashStorageControlBlock* control_block = storage.get_control_block();
75 
76  soc::SharedMutexScope mutex_scope(&metadata_->mutex_);
77  ASSERT_ND(!metadata_->valid_);
78  HashBin total_bin_count = storage.get_bin_count();
79  uint16_t node_count = engine_->get_soc_count();
80  uint64_t bytes = HashPartitionerData::object_size(node_count, total_bin_count);
81  WRAP_ERROR_CODE(metadata_->allocate_data(engine_, &mutex_scope, bytes));
82  data_ = reinterpret_cast<HashPartitionerData*>(metadata_->locate_data(engine_));
83 
84  data_->levels_ = storage.get_levels();
85  ASSERT_ND(storage.get_levels() >= 1U);
86  data_->bin_bits_ = storage.get_bin_bits();
87  data_->bin_shifts_ = storage.get_bin_shifts();
88  data_->partitionable_ = node_count > 1U;
89  data_->total_bin_count_ = total_bin_count;
90 
91  if (!data_->partitionable_) {
92  // No partitioning needed. We don't even allocate memory for bin_owners_ in this case
93  metadata_->valid_ = true;
94  return kRetOk;
95  }
96 
97  ASSERT_ND(!control_block->root_page_pointer_.volatile_pointer_.is_null());
98 
99  // simply checks the owner of volatile pointers in last-level intermediate pages.
100  // though this is an in-memory task, parallelize to make it even faster.
101  // each sub-task is a pointer from the root intermediate page.
102  if (storage.get_levels() == 1U) {
103  VLOG(0) << "Not worth parallelization. just one level. Design on a single thread";
104  // Note, this is just about partition-design. Mapper/Reducer still runs on multi-nodes.
105  for (uint16_t i = 0; i < total_bin_count; ++i) {
107  }
108  } else {
109  HashBin interval = kHashMaxBins[storage.get_levels() - 1U];
110  ASSERT_ND(interval < total_bin_count);
111  ASSERT_ND(interval * kHashIntermediatePageFanout >= total_bin_count);
112  std::vector<std::thread> threads;
113  for (uint16_t i = 0; interval * i < total_bin_count; ++i) {
114  threads.emplace_back(design_partition_thread, this, i);
115  }
116  LOG(INFO) << "Lanched " << threads.size() << " threads. joining..";
117 
118  for (auto& t : threads) {
119  t.join();
120  }
121  LOG(INFO) << "Joined. Designing done";
122  }
123 
124  metadata_->valid_ = true;
125  return kRetOk;
126 }
127 
129  VLOG(0) << "Task-" << task << " started.";
130  ASSERT_ND(data_->partitionable_);
131  HashStorage storage(engine_, id_);
132  HashStorageControlBlock* control_block = storage.get_control_block();
133  const memory::GlobalVolatilePageResolver& resolver
135 
136  // root page is guaranteed to have volatile version.
137  HashIntermediatePage* root_page = reinterpret_cast<HashIntermediatePage*>(
138  resolver.resolve_offset(control_block->root_page_pointer_.volatile_pointer_));
139 
140  HashBin interval = kHashMaxBins[root_page->get_level()];
141  VolatilePagePointer pointer = root_page->get_pointer(task).volatile_pointer_;
142  if (pointer.is_null()) {
143  VLOG(0) << "Task-" << task << " got an empty task. null volatile pointer";
144  std::memset(data_->bin_owners_ + (interval * task), 0, interval);
145  } else if (root_page->get_level() == 0) {
146  ASSERT_ND(interval = 1ULL);
147  ASSERT_ND(storage.get_levels() == 1U);
148  VLOG(2) << "Task-" << task << " is trivial.";
149  data_->bin_owners_[task] = pointer.get_numa_node();
150  } else {
151  HashIntermediatePage* child
152  = reinterpret_cast<HashIntermediatePage*>(resolver.resolve_offset(pointer));
153  design_partition_task_recurse(resolver, child);
154  }
155 
156  VLOG(0) << "Task-" << task << " ended.";
157 }
158 
159 void HashPartitioner::design_partition_task_recurse(
160  const memory::GlobalVolatilePageResolver& resolver,
161  const HashIntermediatePage* page) {
163  HashBin interval = kHashMaxBins[page->get_level()];
164  HashBin begin = page->get_bin_range().begin_;
165  for (uint16_t i = 0;
166  begin + i * interval < data_->total_bin_count_ && i < kHashIntermediatePageFanout;
167  ++i) {
168  HashBin sub_begin = begin + i * interval;
170  if (pointer.is_null()) {
171  std::memset(data_->bin_owners_ + sub_begin, 0, interval);
172  } else if (page->get_level() == 0) {
173  ASSERT_ND(interval = 1ULL);
174  data_->bin_owners_[sub_begin] = pointer.get_numa_node();
175  } else {
176  HashIntermediatePage* child
177  = reinterpret_cast<HashIntermediatePage*>(resolver.resolve_offset(pointer));
178  design_partition_task_recurse(resolver, child);
179  }
180  }
181 }
182 
183 
185  if (!is_partitionable()) {
186  std::memset(args.results_, 0, sizeof(PartitionId) * args.logs_count_);
187  return;
188  }
189 
190  HashStorage storage(engine_, id_);
191  uint8_t bin_shifts = storage.get_bin_shifts();
192  for (uint32_t i = 0; i < args.logs_count_; ++i) {
193  const HashCommonLogType *log = reinterpret_cast<const HashCommonLogType*>(
194  args.log_buffer_.resolve(args.log_positions_[i]));
195  log->assert_type();
196  ASSERT_ND(log->header_.storage_id_ == id_);
197  HashValue hash = log->hash_;
198  HashBin bin = hash >> bin_shifts;
199  ASSERT_ND(bin < storage.get_bin_count());
200  args.results_[i] = data_->bin_owners_[bin];
201  }
202 }
203 
212 struct SortEntry {
213  inline void set(
214  HashBin bin,
215  uint16_t compressed_epoch,
216  uint32_t in_epoch_ordinal,
218  ASSERT_ND(bin < (1ULL << kHashMaxBinBits));
219  data_
220  = static_cast<__uint128_t>(bin) << 80
221  | static_cast<__uint128_t>(compressed_epoch) << 64
222  | static_cast<__uint128_t>(in_epoch_ordinal) << 32
223  | static_cast<__uint128_t>(position);
224  }
225  inline HashBin get_bin() const ALWAYS_INLINE {
226  return static_cast<HashBin>(data_ >> 80);
227  }
229  return static_cast<snapshot::BufferPosition>(data_);
230  }
231  __uint128_t data_;
232 };
233 
235 // __attribute__ ((noinline)) // was useful to forcibly show it on cpu profile. nothing more.
237  uint8_t bin_shifts,
239  SortEntry* entries) {
240  // CPU profile of partition_hash_perf: ??%.
241  const Epoch base_epoch = args.base_epoch_;
242  for (uint32_t i = 0; i < args.logs_count_; ++i) {
243  const HashCommonLogType* log_entry = reinterpret_cast<const HashCommonLogType*>(
244  args.log_buffer_.resolve(args.log_positions_[i]));
245  log_entry->assert_type();
246  Epoch epoch = log_entry->header_.xct_id_.get_epoch();
247  ASSERT_ND(epoch.subtract(base_epoch) < (1U << 16));
248  uint16_t compressed_epoch = epoch.subtract(base_epoch);
249  // this is expensive.. should keep hash in log entries
250  HashValue hash = log_entry->hash_;
251  entries[i].set(
252  hash >> bin_shifts,
253  compressed_epoch,
254  log_entry->header_.xct_id_.get_ordinal(),
255  args.log_positions_[i]);
256  }
257 }
258 
260 // __attribute__ ((noinline)) // was useful to forcibly show it on cpu profile. nothing more.
261 uint32_t compact_logs(
262  uint8_t /*bin_shifts*/,
264  SortEntry* entries) {
265  // TASK(Hideaki) mapper side compaction.
266  // Unlike array, we have to consider all combinations of insert/delete/overwrite.
267  // Also needs to exactly compare keys. We probably need to store hashes in log to make it worth.
268  for (uint32_t i = 0; i < args.logs_count_; ++i) {
269  args.output_buffer_[i] = entries[i].get_position();
270  }
271  return args.logs_count_;
272 /*
273  // CPU profile of partition_hash_perf: ??%.
274  uint32_t result_count = 1;
275  args.output_buffer_[0] = entries[0].get_position();
276  HashBin prev_bin = entries[0].get_bin();
277  for (uint32_t i = 1; i < args.logs_count_; ++i) {
278  // compact the logs if the same offset appears in a row, and covers the same data region.
279  // because we sorted it by offset and then ordinal, later logs can overwrite the earlier one.
280  HashBin cur_bin = entries[i].get_bin();
281  if (UNLIKELY(cur_bin == prev_bin)) {
282  const log::RecordLogType* prev_p = args.log_buffer_.resolve(entries[i - 1].get_position());
283  log::RecordLogType* next_p = args.log_buffer_.resolve(entries[i].get_position());
284  if (prev_p->header_.log_type_code_ != next_p->header_.log_type_code_) {
285  // increment log can be superseded by overwrite log,
286  // overwrite log can be merged with increment log.
287  // however, these usecases are probably much less frequent than the following.
288  // so, we don't compact this case so far.
289  } else if (prev_p->header_.get_type() == log::kLogCodeHashOverwrite) {
290  // two overwrite logs might be compacted
291  const HashOverwriteLogType* prev = reinterpret_cast<const HashOverwriteLogType*>(prev_p);
292  const HashOverwriteLogType* next = reinterpret_cast<const HashOverwriteLogType*>(next_p);
293  // is the data region same or superseded?
294  uint16_t prev_begin = prev->payload_offset_;
295  uint16_t prev_end = prev_begin + prev->payload_count_;
296  uint16_t next_begin = next->payload_offset_;
297  uint16_t next_end = next_begin + next->payload_count_;
298  if (next_begin <= prev_begin && next_end >= prev_end) {
299  --result_count;
300  }
301 
302  // the logic checks data range against only the previous entry.
303  // we might have a situation where 3 or more log entries have the same hash offset
304  // and the data regions are like following
305  // Log 1: [4, 8) bytes, Log 2: [8, 12) bytes, Log 3: [4, 8) bytes
306  // If we check further, Log 3 can eliminate Log 1. However, the check is expensive..
307  } else {
308  // two increment logs of same type/offset can be merged into one.
309  ASSERT_ND(prev_p->header_.get_type() == log::kLogCodeHashIncrement);
310  const HashIncrementLogType* prev = reinterpret_cast<const HashIncrementLogType*>(prev_p);
311  HashIncrementLogType* next = reinterpret_cast<HashIncrementLogType*>(next_p);
312  if (prev->value_type_ == next->value_type_
313  && prev->payload_offset_ == next->payload_offset_) {
314  // add up the prev's addendum to next, then delete prev.
315  next->merge(*prev);
316  --result_count;
317  }
318  }
319  } else {
320  prev_bin = cur_bin;
321  }
322  args.output_buffer_[result_count] = entries[i].get_position();
323  ++result_count;
324  }
325  return result_count;
326  */
327 }
328 
330  if (args.logs_count_ == 0) {
331  *args.written_count_ = 0;
332  return;
333  }
334 
335  // we so far sort them in one path.
336  // to save memory, we could do multi-path merge-sort.
337  // however, in reality each log has many bytes, so log_count is not that big.
338  args.work_memory_->assure_capacity(sizeof(SortEntry) * args.logs_count_);
339 
340  debugging::StopWatch stop_watch_entire;
341 
342  ASSERT_ND(sizeof(SortEntry) == 16U);
343  SortEntry* entries = reinterpret_cast<SortEntry*>(args.work_memory_->get_block());
344  prepare_sort_entries(data_->bin_shifts_, args, entries);
345 
346  debugging::StopWatch stop_watch;
347  // Gave up non-gcc support because of aarch64 support. yes, we can also assume __uint128_t.
348  // CPU profile of partition_hash_perf: ??% (introsort_loop) + ??% (other inlined).
349  std::sort(
350  reinterpret_cast<__uint128_t*>(entries),
351  reinterpret_cast<__uint128_t*>(entries + args.logs_count_));
352  stop_watch.stop();
353  VLOG(0) << "Sorted " << args.logs_count_ << " log entries in " << stop_watch.elapsed_ms() << "ms";
354 
355  uint32_t result_count = compact_logs(data_->bin_shifts_, args, entries);
356 
357  stop_watch_entire.stop();
358  VLOG(0) << "Hash-" << id_ << " sort_batch() done in " << stop_watch_entire.elapsed_ms()
359  << "ms for " << args.logs_count_ << " log entries, compacted them to"
360  << result_count << " log entries";
361  *args.written_count_ = result_count;
362 }
363 
364 std::ostream& operator<<(std::ostream& o, const HashPartitioner& v) {
365  o << "<HashPartitioner>";
366  if (v.data_) {
367  o << "<levels_>" << static_cast<int>(v.data_->levels_) << "</levels_>"
368  << "<total_bin_count_>" << v.data_->total_bin_count_ << "</total_bin_count_>";
369  } else {
370  o << "Not yet designed";
371  }
372  o << "</HashPartitioner>";
373  return o;
374 }
375 
376 } // namespace hash
377 } // namespace storage
378 } // namespace foedus
bool valid_
Whether this partitioner information (metadata+data) has been constructed.
uint32_t * written_count_
[OUT] how many logs written to output_buffer.
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
PartitionId * results_
[OUT] this method will set the partition of logs[i] to results[i].
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
uint32_t logs_count_
number of entries to process.
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
const PartitionId * get_bucket_owners() const
void assert_type() const __attribute__((always_inline))
uint32_t compact_logs(uint8_t, const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
uint32_t subtract(const Epoch &other) const
Returns the number epochs from the given epoch to this epoch accounting for wrap-around.
Definition: epoch.hpp:137
const HashBinRange & get_bin_range() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
uint32_t logs_count_
number of entries to process.
void prepare_sort_entries(uint8_t bin_shifts, const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
Partitioner for a hash storage.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
Definition: stop_watch.hpp:48
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Represents a time epoch.
Definition: epoch.hpp:61
uint32_t get_ordinal() const __attribute__((always_inline))
Definition: xct_id.hpp:976
Declares common log types used in all packages.
HashBin total_bin_count_
Size of the entire hash.
void design_partition_thread(HashPartitioner *partitioner, uint16_t task)
snapshot::BufferPosition * output_buffer_
sorted results are written to this variable.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
std::ostream & operator<<(std::ostream &o, const HashCombo &v)
Definition: hash_combo.cpp:37
Declares all log types used in this storage type.
bool exists() const
Returns whether this storage is already created.
Definition: storage.hpp:169
PageType get_page_type() const
Definition: page.hpp:280
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
ErrorCode allocate_data(Engine *engine, soc::SharedMutexScope *locked, uint32_t data_size)
Allocates a patitioner data in shared memory of the given size.
Definition: partitioner.cpp:61
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
void set(HashBin bin, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, snapshot::BufferPosition position) __attribute__((always_inline))
const snapshot::BufferPosition * log_positions_
positions of log records.
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
Auto-lock scope object for SharedMutex.
HashBin begin_
Inclusive beginning of the range.
Definition: hash_id.hpp:191
void sort_batch(const Partitioner::SortBatchArguments &args) const
void * get_block() const
Returns the memory block.
soc::SharedMutex mutex_
Serialize concurrent initialization of this partitioner.
Represents a key-value store based on a dense and regular hash.
HashBin get_bin() const __attribute__((always_inline))
Represents an intermediate page in Hashtable Storage.
snapshot::BufferPosition get_position() const __attribute__((always_inline))
DualPagePointer root_page_pointer_
Points to the root page (or something equivalent).
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
Tiny metadata of partitioner for every storage used while log gleaning.
void * locate_data(Engine *engine)
Returns the partitioner data pointed from this metadata.
Definition: partitioner.cpp:51
const ErrorStack kRetOk
Normal return value for no-error case.
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Partitioning and sorting logic for one storage.
Definition: partitioner.hpp:70
storage::StorageId storage_id_
The storage this loggable operation mainly affects.
HashValue hash_
Hash value of the key.
void partition_batch(const Partitioner::PartitionBatchArguments &args) const
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
Definition: hash_id.hpp:49
const snapshot::BufferPosition * log_positions_
positions of log records.
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
Definition: hash_id.hpp:74
ErrorStack design_partition(const Partitioner::DesignPartitionArguments &args)
const uint8_t kHashMaxBinBits
Maximum number allowed for bin-bits.
Definition: hash_id.hpp:159
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106
DualPagePointer & get_pointer(uint16_t index)
CONTROL_BLOCK * get_control_block() const
Definition: attachable.hpp:97
bool is_initialized() const
PartitionId bin_owners_[8]
partition of each hash bin.
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
bool partitionable_
if false, every record goes to node-0.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
log::RecordLogType * resolve(BufferPosition position) const
Definition: log_buffer.hpp:42
uint64_t HashValue
Represents a full 64-bit hash value calculated from a key.
Definition: hash_id.hpp:129