20 #include <glog/logging.h>
44 : engine_(parent->get_engine()),
45 id_(parent->get_storage_id()),
88 data_->partitionable_ = node_count > 1U;
89 data_->total_bin_count_ = total_bin_count;
91 if (!data_->partitionable_) {
97 ASSERT_ND(!control_block->root_page_pointer_.volatile_pointer_.is_null());
103 VLOG(0) <<
"Not worth parallelization. just one level. Design on a single thread";
105 for (uint16_t i = 0; i < total_bin_count; ++i) {
112 std::vector<std::thread> threads;
113 for (uint16_t i = 0; interval * i < total_bin_count; ++i) {
116 LOG(INFO) <<
"Lanched " << threads.size() <<
" threads. joining..";
118 for (
auto& t : threads) {
121 LOG(INFO) <<
"Joined. Designing done";
129 VLOG(0) <<
"Task-" << task <<
" started.";
143 VLOG(0) <<
"Task-" << task <<
" got an empty task. null volatile pointer";
144 std::memset(data_->
bin_owners_ + (interval * task), 0, interval);
145 }
else if (root_page->
get_level() == 0) {
148 VLOG(2) <<
"Task-" << task <<
" is trivial.";
153 design_partition_task_recurse(resolver, child);
156 VLOG(0) <<
"Task-" << task <<
" ended.";
159 void HashPartitioner::design_partition_task_recurse(
168 HashBin sub_begin = begin + i * interval;
171 std::memset(data_->
bin_owners_ + sub_begin, 0, interval);
176 HashIntermediatePage* child
177 =
reinterpret_cast<HashIntermediatePage*
>(resolver.
resolve_offset(pointer));
178 design_partition_task_recurse(resolver, child);
198 HashBin bin = hash >> bin_shifts;
215 uint16_t compressed_epoch,
216 uint32_t in_epoch_ordinal,
220 =
static_cast<__uint128_t
>(bin) << 80
221 | static_cast<__uint128_t>(compressed_epoch) << 64
222 |
static_cast<__uint128_t
>(in_epoch_ordinal) << 32
223 | static_cast<__uint128_t>(position);
248 uint16_t compressed_epoch = epoch.
subtract(base_epoch);
350 reinterpret_cast<__uint128_t*>(entries),
351 reinterpret_cast<__uint128_t*>(entries + args.
logs_count_));
357 stop_watch_entire.
stop();
358 VLOG(0) <<
"Hash-" << id_ <<
" sort_batch() done in " << stop_watch_entire.
elapsed_ms()
359 <<
"ms for " << args.
logs_count_ <<
" log entries, compacted them to"
360 << result_count <<
" log entries";
365 o <<
"<HashPartitioner>";
367 o <<
"<levels_>" <<
static_cast<int>(v.data_->
levels_) <<
"</levels_>"
368 <<
"<total_bin_count_>" << v.data_->
total_bin_count_ <<
"</total_bin_count_>";
370 o <<
"Not yet designed";
372 o <<
"</HashPartitioner>";
uint32_t * written_count_
[OUT] how many logs written to output_buffer.
PartitionId * results_
[OUT] this method will set the partition of logs[i] to results[i].
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
uint32_t logs_count_
number of entries to process.
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
bool is_partitionable() const
const PartitionId * get_bucket_owners() const
void assert_type() const __attribute__((always_inline))
uint32_t compact_logs(uint8_t, const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
uint32_t subtract(const Epoch &other) const
Returns the number epochs from the given epoch to this epoch accounting for wrap-around.
const HashBinRange & get_bin_range() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
uint8_t get_levels() const
uint32_t logs_count_
number of entries to process.
void prepare_sort_entries(uint8_t bin_shifts, const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
uint32_t BufferPosition
Represents a position in some buffer.
Partitioner for a hash storage.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
Represents a pointer to a volatile page with modification count for preventing ABA.
Brings error stacktrace information as return value of functions.
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
uint32_t get_ordinal() const __attribute__((always_inline))
Declares common log types used in all packages.
HashBin total_bin_count_
Size of the entire hash.
void design_partition_thread(HashPartitioner *partitioner, uint16_t task)
snapshot::BufferPosition * output_buffer_
sorted results are written to this variable.
void design_partition_task(uint16_t task)
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
HashPartitioner(Partitioner *parent)
uint8_t get_bin_bits() const
VolatilePagePointer volatile_pointer_
std::ostream & operator<<(std::ostream &o, const HashCombo &v)
Declares all log types used in this storage type.
bool exists() const
Returns whether this storage is already created.
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
uint64_t stop()
Take another current time tick.
Epoch get_epoch() const __attribute__((always_inline))
void set(HashBin bin, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, snapshot::BufferPosition position) __attribute__((always_inline))
const snapshot::BufferPosition * log_positions_
positions of log records.
Auto-lock scope object for SharedMutex.
Shared data of this storage type.
HashBin begin_
Inclusive beginning of the range.
void sort_batch(const Partitioner::SortBatchArguments &args) const
void * get_block() const
Returns the memory block.
Represents a key-value store based on a dense and regular hash.
HashBin get_bin() const __attribute__((always_inline))
HashBin get_bin_count() const
Represents an intermediate page in Hashtable Storage.
snapshot::BufferPosition get_position() const __attribute__((always_inline))
uint8_t get_bin_shifts() const
DualPagePointer root_page_pointer_
Points to the root page (or something equivalent).
uint64_t HashBin
Represents a bin of a hash value.
const ErrorStack kRetOk
Normal return value for no-error case.
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Partitioning and sorting logic for one storage.
uint8_t get_level() const
Arguments for sort_batch()
HashValue hash_
Hash value of the key.
void partition_batch(const Partitioner::PartitionBatchArguments &args) const
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
const snapshot::BufferPosition * log_positions_
positions of log records.
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
ErrorStack design_partition(const Partitioner::DesignPartitionArguments &args)
const uint8_t kHashMaxBinBits
Maximum number allowed for bin-bits.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint8_t get_numa_node() const
A high-resolution stop watch.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
DualPagePointer & get_pointer(uint16_t index)
CONTROL_BLOCK * get_control_block() const
bool is_initialized() const
PartitionId bin_owners_[8]
partition of each hash bin.
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
bool partitionable_
if false, every record goes to node-0.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
log::RecordLogType * resolve(BufferPosition position) const
Arguments for design_partition()
uint64_t HashValue
Represents a full 64-bit hash value calculated from a key.
uint64_t object_size() const
Arguments for partition_batch()