libfoedus-core
FOEDUS Core Library
foedus::storage::hash::HashPartitioner Class Referencefinal

Partitioner for a hash storage. More...

Detailed Description

Partitioner for a hash storage.

Partitioning for hash is much easier than for B-trees, but somewhat trickier than array. Compared to array, there will not be and should not be any locality in terms of ranges of hash bins. If there are, it means that the hash function is bad!

Footprint vs Accuracy
The main challenge here is the tradeoff between the footprint and accuracy of the partitioning information. One extreme, which is our current implementation, is to determine a node to assign for every single hash bin. Assuming there are few records in each hash bin, this should give a good locality so that mappers can avoid sending out too many logs to remote reducers. However, we need sizeof(PartitionId) * hashbins bytes of partitioning information. Another extreme is to assign partition in a super coarse granularity, such as root page pointers. Then the footprint is fixed and tiny, but has basically zero accuracy. All random. So far, we picked the former extreme. We thus issue an error when a user specifies a large bin-bits such that 2^bits bytes >= storage_options.partitioner_data_memory_mb_ MB.
Current policy and its limitations
Based on the above simple design, we just pick the owner of current volatile data page as the partition. We don't even balance out so far. In sum, we have many limitations as follows:
  • Large footprints, which might prohibit large hash-tables and cause lots of L1 misses while partitioning.
  • Users must make sure the initial inserts are well balanced between nodes.
Note
This is a private implementation-details of Hashtable Storage, thus file name ends with _impl. Do not include this header from a client program. There is no case client program needs to access this internal class.

Definition at line 67 of file hash_partitioner_impl.hpp.

#include <hash_partitioner_impl.hpp>

Public Member Functions

 HashPartitioner (Partitioner *parent)
 
ErrorStack design_partition (const Partitioner::DesignPartitionArguments &args)
 
void design_partition_task (uint16_t task)
 
bool is_partitionable () const
 
void partition_batch (const Partitioner::PartitionBatchArguments &args) const
 
void sort_batch (const Partitioner::SortBatchArguments &args) const
 
const PartitionIdget_bucket_owners () const
 

Friends

std::ostream & operator<< (std::ostream &o, const HashPartitioner &v)
 

Constructor & Destructor Documentation

foedus::storage::hash::HashPartitioner::HashPartitioner ( Partitioner parent)
explicit

Definition at line 43 of file hash_partitioner_impl.cpp.

References ASSERT_ND, foedus::soc::SharedMutex::is_initialized(), foedus::storage::PartitionerMetadata::locate_data(), foedus::storage::PartitionerMetadata::mutex_, and foedus::storage::PartitionerMetadata::valid_.

44  : engine_(parent->get_engine()),
45  id_(parent->get_storage_id()),
46  metadata_(PartitionerMetadata::get_metadata(engine_, id_)) {
47  ASSERT_ND(metadata_->mutex_.is_initialized());
48  if (metadata_->valid_) {
49  data_ = reinterpret_cast<HashPartitionerData*>(metadata_->locate_data(engine_));
50  } else {
51  data_ = nullptr;
52  }
53 }
bool valid_
Whether this partitioner information (metadata+data) has been constructed.
soc::SharedMutex mutex_
Serialize concurrent initialization of this partitioner.
void * locate_data(Engine *engine)
Returns the partitioner data pointed from this metadata.
Definition: partitioner.cpp:51
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool is_initialized() const
static PartitionerMetadata * get_metadata(Engine *engine, StorageId id)
Returns the shared memory for the given storage ID.
Definition: partitioner.cpp:38

Here is the call graph for this function:

Member Function Documentation

ErrorStack foedus::storage::hash::HashPartitioner::design_partition ( const Partitioner::DesignPartitionArguments args)

Definition at line 68 of file hash_partitioner_impl.cpp.

References foedus::storage::PartitionerMetadata::allocate_data(), ASSERT_ND, design_partition_task(), foedus::storage::hash::design_partition_thread(), foedus::storage::Storage< CONTROL_BLOCK >::exists(), foedus::storage::hash::HashStorage::get_bin_bits(), foedus::storage::hash::HashStorage::get_bin_count(), foedus::storage::hash::HashStorage::get_bin_shifts(), foedus::Attachable< CONTROL_BLOCK >::get_control_block(), foedus::storage::hash::HashStorage::get_levels(), foedus::Engine::get_soc_count(), foedus::soc::SharedMutex::is_initialized(), foedus::storage::hash::kHashIntermediatePageFanout, foedus::storage::hash::kHashMaxBins, foedus::kRetOk, foedus::storage::hash::HashPartitionerData::levels_, foedus::storage::PartitionerMetadata::locate_data(), foedus::storage::PartitionerMetadata::mutex_, foedus::storage::hash::HashPartitionerData::object_size(), foedus::storage::PartitionerMetadata::valid_, and WRAP_ERROR_CODE.

Referenced by foedus::storage::Partitioner::design_partition().

69  {
70  ASSERT_ND(metadata_->mutex_.is_initialized());
71  ASSERT_ND(data_ == nullptr);
72  HashStorage storage(engine_, id_);
73  ASSERT_ND(storage.exists());
74  HashStorageControlBlock* control_block = storage.get_control_block();
75 
76  soc::SharedMutexScope mutex_scope(&metadata_->mutex_);
77  ASSERT_ND(!metadata_->valid_);
78  HashBin total_bin_count = storage.get_bin_count();
79  uint16_t node_count = engine_->get_soc_count();
80  uint64_t bytes = HashPartitionerData::object_size(node_count, total_bin_count);
81  WRAP_ERROR_CODE(metadata_->allocate_data(engine_, &mutex_scope, bytes));
82  data_ = reinterpret_cast<HashPartitionerData*>(metadata_->locate_data(engine_));
83 
84  data_->levels_ = storage.get_levels();
85  ASSERT_ND(storage.get_levels() >= 1U);
86  data_->bin_bits_ = storage.get_bin_bits();
87  data_->bin_shifts_ = storage.get_bin_shifts();
88  data_->partitionable_ = node_count > 1U;
89  data_->total_bin_count_ = total_bin_count;
90 
91  if (!data_->partitionable_) {
92  // No partitioning needed. We don't even allocate memory for bin_owners_ in this case
93  metadata_->valid_ = true;
94  return kRetOk;
95  }
96 
97  ASSERT_ND(!control_block->root_page_pointer_.volatile_pointer_.is_null());
98 
99  // simply checks the owner of volatile pointers in last-level intermediate pages.
100  // though this is an in-memory task, parallelize to make it even faster.
101  // each sub-task is a pointer from the root intermediate page.
102  if (storage.get_levels() == 1U) {
103  VLOG(0) << "Not worth parallelization. just one level. Design on a single thread";
104  // Note, this is just about partition-design. Mapper/Reducer still runs on multi-nodes.
105  for (uint16_t i = 0; i < total_bin_count; ++i) {
107  }
108  } else {
109  HashBin interval = kHashMaxBins[storage.get_levels() - 1U];
110  ASSERT_ND(interval < total_bin_count);
111  ASSERT_ND(interval * kHashIntermediatePageFanout >= total_bin_count);
112  std::vector<std::thread> threads;
113  for (uint16_t i = 0; interval * i < total_bin_count; ++i) {
114  threads.emplace_back(design_partition_thread, this, i);
115  }
116  LOG(INFO) << "Lanched " << threads.size() << " threads. joining..";
117 
118  for (auto& t : threads) {
119  t.join();
120  }
121  LOG(INFO) << "Joined. Designing done";
122  }
123 
124  metadata_->valid_ = true;
125  return kRetOk;
126 }
bool valid_
Whether this partitioner information (metadata+data) has been constructed.
void design_partition_thread(HashPartitioner *partitioner, uint16_t task)
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
ErrorCode allocate_data(Engine *engine, soc::SharedMutexScope *locked, uint32_t data_size)
Allocates a patitioner data in shared memory of the given size.
Definition: partitioner.cpp:61
soc::SharedMutex mutex_
Serialize concurrent initialization of this partitioner.
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
void * locate_data(Engine *engine)
Returns the partitioner data pointed from this metadata.
Definition: partitioner.cpp:51
const ErrorStack kRetOk
Normal return value for no-error case.
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
Definition: hash_id.hpp:49
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
Definition: hash_id.hpp:74
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
bool is_initialized() const

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::storage::hash::HashPartitioner::design_partition_task ( uint16_t  task)

Definition at line 128 of file hash_partitioner_impl.cpp.

References ASSERT_ND, foedus::storage::hash::HashPartitionerData::bin_owners_, foedus::Attachable< CONTROL_BLOCK >::get_control_block(), foedus::memory::EngineMemory::get_global_volatile_page_resolver(), foedus::storage::hash::HashIntermediatePage::get_level(), foedus::storage::hash::HashStorage::get_levels(), foedus::Engine::get_memory_manager(), foedus::storage::VolatilePagePointer::get_numa_node(), foedus::storage::hash::HashIntermediatePage::get_pointer(), foedus::storage::VolatilePagePointer::is_null(), foedus::storage::hash::kHashMaxBins, foedus::storage::hash::HashPartitionerData::partitionable_, foedus::memory::GlobalVolatilePageResolver::resolve_offset(), foedus::storage::hash::HashStorageControlBlock::root_page_pointer_, and foedus::storage::DualPagePointer::volatile_pointer_.

Referenced by design_partition(), and foedus::storage::hash::design_partition_thread().

128  {
129  VLOG(0) << "Task-" << task << " started.";
130  ASSERT_ND(data_->partitionable_);
131  HashStorage storage(engine_, id_);
132  HashStorageControlBlock* control_block = storage.get_control_block();
133  const memory::GlobalVolatilePageResolver& resolver
135 
136  // root page is guaranteed to have volatile version.
137  HashIntermediatePage* root_page = reinterpret_cast<HashIntermediatePage*>(
138  resolver.resolve_offset(control_block->root_page_pointer_.volatile_pointer_));
139 
140  HashBin interval = kHashMaxBins[root_page->get_level()];
141  VolatilePagePointer pointer = root_page->get_pointer(task).volatile_pointer_;
142  if (pointer.is_null()) {
143  VLOG(0) << "Task-" << task << " got an empty task. null volatile pointer";
144  std::memset(data_->bin_owners_ + (interval * task), 0, interval);
145  } else if (root_page->get_level() == 0) {
146  ASSERT_ND(interval = 1ULL);
147  ASSERT_ND(storage.get_levels() == 1U);
148  VLOG(2) << "Task-" << task << " is trivial.";
149  data_->bin_owners_[task] = pointer.get_numa_node();
150  } else {
151  HashIntermediatePage* child
152  = reinterpret_cast<HashIntermediatePage*>(resolver.resolve_offset(pointer));
153  design_partition_task_recurse(resolver, child);
154  }
155 
156  VLOG(0) << "Task-" << task << " ended.";
157 }
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
Definition: hash_id.hpp:74
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
PartitionId bin_owners_[8]
partition of each hash bin.
bool partitionable_
if false, every record goes to node-0.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50

Here is the call graph for this function:

Here is the caller graph for this function:

const PartitionId * foedus::storage::hash::HashPartitioner::get_bucket_owners ( ) const

Definition at line 55 of file hash_partitioner_impl.cpp.

References ASSERT_ND, and foedus::storage::hash::HashPartitionerData::bin_owners_.

55  {
56  ASSERT_ND(data_);
57  return data_->bin_owners_;
58 }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
PartitionId bin_owners_[8]
partition of each hash bin.
bool foedus::storage::hash::HashPartitioner::is_partitionable ( ) const

Definition at line 59 of file hash_partitioner_impl.cpp.

References ASSERT_ND, and foedus::storage::hash::HashPartitionerData::partitionable_.

Referenced by foedus::storage::Partitioner::is_partitionable(), and partition_batch().

59  {
60  ASSERT_ND(data_);
61  return data_->partitionable_;
62 }
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool partitionable_
if false, every record goes to node-0.

Here is the caller graph for this function:

void foedus::storage::hash::HashPartitioner::partition_batch ( const Partitioner::PartitionBatchArguments args) const

Definition at line 184 of file hash_partitioner_impl.cpp.

References ASSERT_ND, foedus::storage::hash::HashCommonLogType::assert_type(), foedus::storage::hash::HashPartitionerData::bin_owners_, foedus::storage::hash::HashStorage::get_bin_count(), foedus::storage::hash::HashStorage::get_bin_shifts(), foedus::storage::hash::HashCommonLogType::hash_, foedus::log::BaseLogType::header_, is_partitionable(), foedus::storage::Partitioner::PartitionBatchArguments::log_buffer_, foedus::storage::Partitioner::PartitionBatchArguments::log_positions_, foedus::storage::Partitioner::PartitionBatchArguments::logs_count_, foedus::snapshot::LogBuffer::resolve(), foedus::storage::Partitioner::PartitionBatchArguments::results_, and foedus::log::LogHeader::storage_id_.

Referenced by foedus::storage::Partitioner::partition_batch().

184  {
185  if (!is_partitionable()) {
186  std::memset(args.results_, 0, sizeof(PartitionId) * args.logs_count_);
187  return;
188  }
189 
190  HashStorage storage(engine_, id_);
191  uint8_t bin_shifts = storage.get_bin_shifts();
192  for (uint32_t i = 0; i < args.logs_count_; ++i) {
193  const HashCommonLogType *log = reinterpret_cast<const HashCommonLogType*>(
194  args.log_buffer_.resolve(args.log_positions_[i]));
195  log->assert_type();
196  ASSERT_ND(log->header_.storage_id_ == id_);
197  HashValue hash = log->hash_;
198  HashBin bin = hash >> bin_shifts;
199  ASSERT_ND(bin < storage.get_bin_count());
200  args.results_[i] = data_->bin_owners_[bin];
201  }
202 }
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Definition: storage_id.hpp:65
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
PartitionId bin_owners_[8]
partition of each hash bin.
uint64_t HashValue
Represents a full 64-bit hash value calculated from a key.
Definition: hash_id.hpp:129

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::storage::hash::HashPartitioner::sort_batch ( const Partitioner::SortBatchArguments args) const

Definition at line 329 of file hash_partitioner_impl.cpp.

References ASSERT_ND, foedus::memory::AlignedMemory::assure_capacity(), foedus::storage::hash::HashPartitionerData::bin_shifts_, foedus::storage::hash::compact_logs(), foedus::debugging::StopWatch::elapsed_ms(), foedus::memory::AlignedMemory::get_block(), foedus::storage::Partitioner::SortBatchArguments::logs_count_, foedus::storage::hash::prepare_sort_entries(), foedus::debugging::StopWatch::stop(), foedus::storage::Partitioner::SortBatchArguments::work_memory_, and foedus::storage::Partitioner::SortBatchArguments::written_count_.

Referenced by foedus::storage::Partitioner::sort_batch().

329  {
330  if (args.logs_count_ == 0) {
331  *args.written_count_ = 0;
332  return;
333  }
334 
335  // we so far sort them in one path.
336  // to save memory, we could do multi-path merge-sort.
337  // however, in reality each log has many bytes, so log_count is not that big.
338  args.work_memory_->assure_capacity(sizeof(SortEntry) * args.logs_count_);
339 
340  debugging::StopWatch stop_watch_entire;
341 
342  ASSERT_ND(sizeof(SortEntry) == 16U);
343  SortEntry* entries = reinterpret_cast<SortEntry*>(args.work_memory_->get_block());
344  prepare_sort_entries(data_->bin_shifts_, args, entries);
345 
346  debugging::StopWatch stop_watch;
347  // Gave up non-gcc support because of aarch64 support. yes, we can also assume __uint128_t.
348  // CPU profile of partition_hash_perf: ??% (introsort_loop) + ??% (other inlined).
349  std::sort(
350  reinterpret_cast<__uint128_t*>(entries),
351  reinterpret_cast<__uint128_t*>(entries + args.logs_count_));
352  stop_watch.stop();
353  VLOG(0) << "Sorted " << args.logs_count_ << " log entries in " << stop_watch.elapsed_ms() << "ms";
354 
355  uint32_t result_count = compact_logs(data_->bin_shifts_, args, entries);
356 
357  stop_watch_entire.stop();
358  VLOG(0) << "Hash-" << id_ << " sort_batch() done in " << stop_watch_entire.elapsed_ms()
359  << "ms for " << args.logs_count_ << " log entries, compacted them to"
360  << result_count << " log entries";
361  *args.written_count_ = result_count;
362 }
uint32_t compact_logs(uint8_t, const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
void prepare_sort_entries(uint8_t bin_shifts, const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  o,
const HashPartitioner v 
)
friend

Definition at line 364 of file hash_partitioner_impl.cpp.

364  {
365  o << "<HashPartitioner>";
366  if (v.data_) {
367  o << "<levels_>" << static_cast<int>(v.data_->levels_) << "</levels_>"
368  << "<total_bin_count_>" << v.data_->total_bin_count_ << "</total_bin_count_>";
369  } else {
370  o << "Not yet designed";
371  }
372  o << "</HashPartitioner>";
373  return o;
374 }

The documentation for this class was generated from the following files: