libfoedus-core
FOEDUS Core Library
array_partitioner_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <ostream>
24 #include <vector>
25 
26 #include "foedus/engine.hpp"
38 
39 namespace foedus {
40 namespace storage {
41 namespace array {
42 
44  : engine_(parent->get_engine()),
45  id_(parent->get_storage_id()),
46  metadata_(PartitionerMetadata::get_metadata(engine_, id_)) {
47  ASSERT_ND(metadata_->mutex_.is_initialized());
48  if (metadata_->valid_) {
49  data_ = reinterpret_cast<ArrayPartitionerData*>(metadata_->locate_data(engine_));
50  } else {
51  data_ = nullptr;
52  }
53 }
54 
56  ASSERT_ND(data_);
57  return data_->array_size_;
58 }
59 
61  ASSERT_ND(data_);
62  return data_->array_levels_;
63 }
65  ASSERT_ND(data_);
66  return data_->bucket_owners_;
67 }
69  ASSERT_ND(data_);
70  return data_->partitionable_;
71 }
72 
74  const Partitioner::DesignPartitionArguments& /*args*/) {
75  ASSERT_ND(metadata_->mutex_.is_initialized());
76  ASSERT_ND(data_ == nullptr);
77  ArrayStorage storage(engine_, id_);
78  ASSERT_ND(storage.exists());
79  ArrayStorageControlBlock* control_block = storage.get_control_block();
80 
81  soc::SharedMutexScope mutex_scope(&metadata_->mutex_);
82  ASSERT_ND(!metadata_->valid_);
83  WRAP_ERROR_CODE(metadata_->allocate_data(engine_, &mutex_scope, sizeof(ArrayPartitionerData)));
84  data_ = reinterpret_cast<ArrayPartitionerData*>(metadata_->locate_data(engine_));
85 
86  data_->array_levels_ = storage.get_levels();
87  data_->array_size_ = storage.get_array_size();
88 
89  if (storage.get_levels() == 1U || engine_->get_soc_count() == 1U) {
90  // No partitioning needed.
91  data_->bucket_owners_[0] = 0;
92  data_->partitionable_ = false;
93  data_->bucket_size_ = data_->array_size_;
94  metadata_->valid_ = true;
95  return kRetOk;
96  }
97 
98  data_->partitionable_ = true;
99  ASSERT_ND(storage.get_levels() >= 2U);
100 
101  // bucket size is interval that corresponds to a direct child of root.
102  // eg) levels==2 : leaf, levels==3: leaf*kInteriorFanout, ...
103  data_->bucket_size_ = control_block->route_finder_.get_records_in_leaf();
104  for (uint32_t level = 1; level < storage.get_levels() - 1U; ++level) {
105  data_->bucket_size_ *= kInteriorFanout;
106  }
107 
108  const memory::GlobalVolatilePageResolver& resolver
110  // root page is guaranteed to have volatile version.
111  ArrayPage* root_page = reinterpret_cast<ArrayPage*>(
112  resolver.resolve_offset(control_block->root_page_pointer_.volatile_pointer_));
113  ASSERT_ND(!root_page->is_leaf());
114 
115  // how many direct children does this root page have?
116  uint16_t direct_children = storage.get_array_size() / data_->bucket_size_ + 1U;
117  if (storage.get_array_size() % data_->bucket_size_ != 0) {
118  ++direct_children;
119  }
120 
121  // do we have enough direct children? if not, some partition will not receive buckets.
122  // Although it's not a critical error, let's log it as an error.
123  uint16_t total_partitions = engine_->get_options().thread_.group_count_;
124  ASSERT_ND(total_partitions > 1U); // if not, why we are coming here. it's a waste.
125 
126  if (direct_children < total_partitions) {
127  LOG(ERROR) << "Warning-like error: This array doesn't have enough direct children in root"
128  " page to assign partitions. #partitions=" << total_partitions << ", #direct children="
129  << direct_children << ". array=" << storage;
130  }
131 
132  // two paths. first path simply sees volatile/snapshot pointer and determines owner.
133  // second path addresses excessive assignments, off loading them to needy ones.
134  std::vector<uint16_t> counts(total_partitions, 0);
135  const uint16_t excessive_count = (direct_children / total_partitions) + 1;
136  std::vector<uint16_t> excessive_children;
137  for (uint16_t child = 0; child < direct_children; ++child) {
138  const DualPagePointer &pointer = root_page->get_interior_record(child);
139  PartitionId partition;
140  if (!pointer.volatile_pointer_.is_null()) {
141  partition = pointer.volatile_pointer_.get_numa_node();
142  } else {
143  // if no volatile page, see snapshot page owner.
145  // this ignores the case where neither snapshot/volatile page is there.
146  // however, as we create all pages at ArrayStorage::create(), this so far never happens.
147  }
148  ASSERT_ND(partition < total_partitions);
149  if (counts[partition] >= excessive_count) {
150  excessive_children.push_back(child);
151  } else {
152  ++counts[partition];
153  data_->bucket_owners_[child] = partition;
154  }
155  }
156 
157  // just add it to the one with least assignments.
158  // a stupid loop, but this part won't be a bottleneck (only 250 elements).
159  for (uint16_t child : excessive_children) {
160  PartitionId most_needy = 0;
161  for (PartitionId partition = 1; partition < total_partitions; ++partition) {
162  if (counts[partition] < counts[most_needy]) {
163  most_needy = partition;
164  }
165  }
166 
167  ++counts[most_needy];
168  data_->bucket_owners_[child] = most_needy;
169  }
170 
171  metadata_->valid_ = true;
172  return kRetOk;
173 }
174 
176  if (!is_partitionable()) {
177  std::memset(args.results_, 0, sizeof(PartitionId) * args.logs_count_);
178  return;
179  }
180 
181  ASSERT_ND(data_->bucket_size_ > 0);
182  assorted::ConstDiv bucket_size_div(data_->bucket_size_);
183  for (uint32_t i = 0; i < args.logs_count_; ++i) {
184  const ArrayCommonUpdateLogType *log = reinterpret_cast<const ArrayCommonUpdateLogType*>(
185  args.log_buffer_.resolve(args.log_positions_[i]));
188  ASSERT_ND(log->header_.storage_id_ == id_);
189  ASSERT_ND(log->offset_ < get_array_size());
190  uint64_t bucket = bucket_size_div.div64(log->offset_);
191  ASSERT_ND(bucket < kInteriorFanout);
192  args.results_[i] = get_bucket_owners()[bucket];
193  }
194 }
195 
204 struct SortEntry {
205  inline void set(
206  ArrayOffset offset,
207  uint16_t compressed_epoch,
208  uint32_t in_epoch_ordinal,
210  ASSERT_ND(offset < kMaxArrayOffset);
211  data_
212  = static_cast<__uint128_t>(offset) << 80
213  | static_cast<__uint128_t>(compressed_epoch) << 64
214  | static_cast<__uint128_t>(in_epoch_ordinal) << 32
215  | static_cast<__uint128_t>(position);
216  }
218  return static_cast<ArrayOffset>(data_ >> 80);
219  }
221  return static_cast<snapshot::BufferPosition>(data_);
222  }
223  __uint128_t data_;
224 };
225 
227 // __attribute__ ((noinline)) // was useful to forcibly show it on cpu profile. nothing more.
229  // CPU profile of partition_array_perf: 6-10%.
230  const Epoch base_epoch = args.base_epoch_;
231  for (uint32_t i = 0; i < args.logs_count_; ++i) {
232  const ArrayCommonUpdateLogType* log_entry = reinterpret_cast<const ArrayCommonUpdateLogType*>(
233  args.log_buffer_.resolve(args.log_positions_[i]));
236  Epoch epoch = log_entry->header_.xct_id_.get_epoch();
237  ASSERT_ND(epoch.subtract(base_epoch) < (1U << 16));
238  uint16_t compressed_epoch = epoch.subtract(base_epoch);
239  entries[i].set(
240  log_entry->offset_,
241  compressed_epoch,
242  log_entry->header_.xct_id_.get_ordinal(),
243  args.log_positions_[i]);
244  }
245 }
246 
248 // __attribute__ ((noinline)) // was useful to forcibly show it on cpu profile. nothing more.
249 uint32_t compact_logs(const Partitioner::SortBatchArguments& args, SortEntry* entries) {
250  // CPU profile of partition_array_perf: 30-35%.
251  // Yeah, this is not cheap... but it can dramatically compact the logs.
252  uint32_t result_count = 1;
253  args.output_buffer_[0] = entries[0].get_position();
254  ArrayOffset prev_offset = entries[0].get_offset();
255  for (uint32_t i = 1; i < args.logs_count_; ++i) {
256  // compact the logs if the same offset appears in a row, and covers the same data region.
257  // because we sorted it by offset and then ordinal, later logs can overwrite the earlier one.
258  ArrayOffset cur_offset = entries[i].get_offset();
259  // wow, adding this UNLIKELY changed the CPU cost of this function from 35% to 13%,
260  // throughput of entire partition_array_perf 5M to 8.5M. because in this experiment
261  // there are few entries with same offset. I haven't seen this much difference with
262  // gcc's unlikely hint before! umm, compiler is not that smart, after all.
263  // this will penalize the case where we have many compaction, but in that case
264  // the following code has more cost anyways.
265  if (UNLIKELY(cur_offset == prev_offset)) {
266  const log::RecordLogType* prev_p = args.log_buffer_.resolve(entries[i - 1].get_position());
267  log::RecordLogType* next_p = args.log_buffer_.resolve(entries[i].get_position());
268  if (prev_p->header_.log_type_code_ != next_p->header_.log_type_code_) {
269  // increment log can be superseded by overwrite log,
270  // overwrite log can be merged with increment log.
271  // however, these usecases are probably much less frequent than the following.
272  // so, we don't compact this case so far.
273  } else if (prev_p->header_.get_type() == log::kLogCodeArrayOverwrite) {
274  // two overwrite logs might be compacted
275  const ArrayOverwriteLogType* prev = reinterpret_cast<const ArrayOverwriteLogType*>(prev_p);
276  const ArrayOverwriteLogType* next = reinterpret_cast<const ArrayOverwriteLogType*>(next_p);
277  // is the data region same or superseded?
278  uint16_t prev_begin = prev->payload_offset_;
279  uint16_t prev_end = prev_begin + prev->payload_count_;
280  uint16_t next_begin = next->payload_offset_;
281  uint16_t next_end = next_begin + next->payload_count_;
282  if (next_begin <= prev_begin && next_end >= prev_end) {
283  --result_count;
284  }
285 
286  // the logic checks data range against only the previous entry.
287  // we might have a situation where 3 or more log entries have the same array offset
288  // and the data regions are like following
289  // Log 1: [4, 8) bytes, Log 2: [8, 12) bytes, Log 3: [4, 8) bytes
290  // If we check further, Log 3 can eliminate Log 1. However, the check is expensive..
291  } else {
292  // two increment logs of same type/offset can be merged into one.
294  const ArrayIncrementLogType* prev = reinterpret_cast<const ArrayIncrementLogType*>(prev_p);
295  ArrayIncrementLogType* next = reinterpret_cast<ArrayIncrementLogType*>(next_p);
296  if (prev->value_type_ == next->value_type_
297  && prev->payload_offset_ == next->payload_offset_) {
298  // add up the prev's addendum to next, then delete prev.
299  next->merge(*prev);
300  --result_count;
301  }
302  }
303  } else {
304  prev_offset = cur_offset;
305  }
306  args.output_buffer_[result_count] = entries[i].get_position();
307  ++result_count;
308  }
309  return result_count;
310 }
311 
313  if (args.logs_count_ == 0) {
314  *args.written_count_ = 0;
315  return;
316  }
317 
318  // we so far sort them in one path.
319  // to save memory, we could do multi-path merge-sort.
320  // however, in reality each log has many bytes, so log_count is not that big.
321  args.work_memory_->assure_capacity(sizeof(SortEntry) * args.logs_count_);
322 
323  debugging::StopWatch stop_watch_entire;
324 
325  ASSERT_ND(sizeof(SortEntry) == 16U);
326  SortEntry* entries = reinterpret_cast<SortEntry*>(args.work_memory_->get_block());
327  prepare_sort_entries(args, entries);
328 
329  debugging::StopWatch stop_watch;
330  // Gave up non-gcc support because of aarch64 support. yes, we can also assume __uint128_t.
331  // CPU profile of partition_array_perf: 50% (introsort_loop) + 7% (other inlined).
332  std::sort(
333  reinterpret_cast<__uint128_t*>(entries),
334  reinterpret_cast<__uint128_t*>(entries + args.logs_count_));
335  stop_watch.stop();
336  VLOG(0) << "Sorted " << args.logs_count_ << " log entries in " << stop_watch.elapsed_ms() << "ms";
337 
338  uint32_t result_count = compact_logs(args, entries);
339 
340  stop_watch_entire.stop();
341  VLOG(0) << "Array-" << id_ << " sort_batch() done in " << stop_watch_entire.elapsed_ms()
342  << "ms for " << args.logs_count_ << " log entries, compacted them to"
343  << result_count << " log entries";
344  *args.written_count_ = result_count;
345 }
346 
347 std::ostream& operator<<(std::ostream& o, const ArrayPartitioner& v) {
348  o << "<ArrayPartitioner>";
349  if (v.data_) {
350  o << "<array_size_>" << v.data_->array_size_ << "</array_size_>"
351  << "<bucket_size_>" << v.data_->bucket_size_ << "</bucket_size_>";
352  for (uint16_t i = 0; i < kInteriorFanout; ++i) {
353  o << "<range bucket=\"" << i << "\" partition=\"" << v.data_->bucket_owners_[i] << "\" />";
354  }
355  } else {
356  o << "Not yet designed";
357  }
358  o << "</ArrayPartitioner>";
359  return o;
360 }
361 
362 } // namespace array
363 } // namespace storage
364 } // namespace foedus
bool valid_
Whether this partitioner information (metadata+data) has been constructed.
uint32_t * written_count_
[OUT] how many logs written to output_buffer.
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
PartitionId * results_
[OUT] this method will set the partition of logs[i] to results[i].
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
uint32_t logs_count_
number of entries to process.
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
bool partitionable_
if false, every record goes to node-0.
Declares all log types used in this storage type.
uint32_t subtract(const Epoch &other) const
Returns the number epochs from the given epoch to this epoch accounting for wrap-around.
Definition: epoch.hpp:137
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents a key-value store based on a dense and regular array.
void partition_batch(const Partitioner::PartitionBatchArguments &args) const
const ArrayOffset kMaxArrayOffset
The maximum value allowed for ArrayOffset.
Definition: array_id.hpp:54
uint32_t logs_count_
number of entries to process.
uint64_t ArrayOffset
The only key type in array storage.
Definition: array_id.hpp:48
uint32_t BufferPosition
Represents a position in some buffer.
Definition: snapshot_id.hpp:72
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
Definition: stop_watch.hpp:48
PartitionId bucket_owners_[kInteriorFanout]
partition of each bucket.
ArrayOffset array_size_
Size of the entire array.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Represents one data page in Array Storage.
Represents a time epoch.
Definition: epoch.hpp:61
uint32_t get_ordinal() const __attribute__((always_inline))
Definition: xct_id.hpp:976
ArrayOffset bucket_size_
bucket = offset / bucket_size_.
The pre-calculated p-m pair for optimized integer division by constant.
Definition: const_div.hpp:67
Declares common log types used in all packages.
Partitioner for an array storage.
0x0023 : foedus::storage::array::ArrayIncrementLogType .
Definition: log_type.hpp:116
0x0022 : foedus::storage::array::ArrayOverwriteLogType .
Definition: log_type.hpp:115
const EngineOptions & get_options() const
Definition: engine.cpp:39
ArrayOffset get_array_size() const
Returns the size of this array.
uint32_t compact_logs(const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
A base class for ArrayOverwriteLogType/ArrayIncrementLogType.
snapshot::BufferPosition * output_buffer_
sorted results are written to this variable.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
void sort_batch(const Partitioner::SortBatchArguments &args) const
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
void prepare_sort_entries(const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
Log type of array-storage's overwrite operation.
const DualPagePointer & get_interior_record(uint16_t record) const __attribute__((always_inline))
bool exists() const
Returns whether this storage is already created.
Definition: storage.hpp:169
void merge(const ArrayIncrementLogType &other) __attribute__((always_inline))
A special optimization for increment logs in log gleaner.
ErrorCode allocate_data(Engine *engine, soc::SharedMutexScope *locked, uint32_t data_size)
Allocates a patitioner data in shared memory of the given size.
Definition: partitioner.cpp:61
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
const snapshot::BufferPosition * log_positions_
positions of log records.
xct::XctId xct_id_
Epoch and in-epoch ordinal of this log.
Auto-lock scope object for SharedMutex.
uint16_t log_type_code_
Actually of LogCode defined in the X-Macro, but we want to make sure the type size is 2 bytes...
Log type of array-storage's increment operation.
void * get_block() const
Returns the memory block.
std::ostream & operator<<(std::ostream &o, const ArrayCreateLogType &v)
soc::SharedMutex mutex_
Serialize concurrent initialization of this partitioner.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:95
uint16_t group_count_
Number of ThreadGroup in the engine.
void set(ArrayOffset offset, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, snapshot::BufferPosition position) __attribute__((always_inline))
uint8_t get_levels() const
Returns the number of levels.
thread::ThreadOptions thread_
Tiny metadata of partitioner for every storage used while log gleaning.
void * locate_data(Engine *engine)
Returns the partitioner data pointed from this metadata.
Definition: partitioner.cpp:51
const ErrorStack kRetOk
Normal return value for no-error case.
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Partitioning and sorting logic for one storage.
Definition: partitioner.hpp:70
storage::StorageId storage_id_
The storage this loggable operation mainly affects.
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
ArrayOffset get_offset() const __attribute__((always_inline))
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
const snapshot::BufferPosition * log_positions_
positions of log records.
Base class for log type of record-wise operation.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
Definition: array_id.hpp:110
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
A high-resolution stop watch.
Definition: stop_watch.hpp:30
uint64_t div64(uint64_t n) const
64-bit integer division that outputs both quotient and remainder.
Definition: const_div.hpp:241
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
Definition: compiler.hpp:106
CONTROL_BLOCK * get_control_block() const
Definition: attachable.hpp:97
bool is_initialized() const
ErrorStack design_partition(const Partitioner::DesignPartitionArguments &args)
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
log::RecordLogType * resolve(BufferPosition position) const
Definition: log_buffer.hpp:42
snapshot::BufferPosition get_position() const __attribute__((always_inline))