20 #include <glog/logging.h>
44 : engine_(parent->get_engine()),
45 id_(parent->get_storage_id()),
91 data_->bucket_owners_[0] = 0;
92 data_->partitionable_ =
false;
93 data_->bucket_size_ = data_->array_size_;
98 data_->partitionable_ =
true;
103 data_->bucket_size_ = control_block->route_finder_.get_records_in_leaf();
104 for (uint32_t level = 1; level < storage.
get_levels() - 1U; ++level) {
112 resolver.
resolve_offset(control_block->root_page_pointer_.volatile_pointer_));
116 uint16_t direct_children = storage.
get_array_size() / data_->bucket_size_ + 1U;
126 if (direct_children < total_partitions) {
127 LOG(ERROR) <<
"Warning-like error: This array doesn't have enough direct children in root"
128 " page to assign partitions. #partitions=" << total_partitions <<
", #direct children="
129 << direct_children <<
". array=" << storage;
134 std::vector<uint16_t> counts(total_partitions, 0);
135 const uint16_t excessive_count = (direct_children / total_partitions) + 1;
136 std::vector<uint16_t> excessive_children;
137 for (uint16_t child = 0; child < direct_children; ++child) {
149 if (counts[partition] >= excessive_count) {
150 excessive_children.push_back(child);
153 data_->bucket_owners_[child] = partition;
159 for (uint16_t child : excessive_children) {
161 for (
PartitionId partition = 1; partition < total_partitions; ++partition) {
162 if (counts[partition] < counts[most_needy]) {
163 most_needy = partition;
167 ++counts[most_needy];
168 data_->bucket_owners_[child] = most_needy;
207 uint16_t compressed_epoch,
208 uint32_t in_epoch_ordinal,
212 =
static_cast<__uint128_t
>(offset) << 80
213 | static_cast<__uint128_t>(compressed_epoch) << 64
214 |
static_cast<__uint128_t
>(in_epoch_ordinal) << 32
215 | static_cast<__uint128_t>(position);
238 uint16_t compressed_epoch = epoch.
subtract(base_epoch);
252 uint32_t result_count = 1;
265 if (
UNLIKELY(cur_offset == prev_offset)) {
282 if (next_begin <= prev_begin && next_end >= prev_end) {
304 prev_offset = cur_offset;
333 reinterpret_cast<__uint128_t*>(entries),
334 reinterpret_cast<__uint128_t*>(entries + args.
logs_count_));
340 stop_watch_entire.
stop();
341 VLOG(0) <<
"Array-" << id_ <<
" sort_batch() done in " << stop_watch_entire.
elapsed_ms()
342 <<
"ms for " << args.
logs_count_ <<
" log entries, compacted them to"
343 << result_count <<
" log entries";
348 o <<
"<ArrayPartitioner>";
350 o <<
"<array_size_>" << v.data_->
array_size_ <<
"</array_size_>"
351 <<
"<bucket_size_>" << v.data_->
bucket_size_ <<
"</bucket_size_>";
353 o <<
"<range bucket=\"" << i <<
"\" partition=\"" << v.data_->
bucket_owners_[i] <<
"\" />";
356 o <<
"Not yet designed";
358 o <<
"</ArrayPartitioner>";
uint32_t * written_count_
[OUT] how many logs written to output_buffer.
ArrayOffset get_array_size() const
PartitionId * results_
[OUT] this method will set the partition of logs[i] to results[i].
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Represents a pointer to another page (usually a child page).
uint32_t logs_count_
number of entries to process.
Epoch base_epoch_
All log entries in this inputs are assured to be after this epoch.
bool partitionable_
if false, every record goes to node-0.
Declares all log types used in this storage type.
uint32_t subtract(const Epoch &other) const
Returns the number epochs from the given epoch to this epoch accounting for wrap-around.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Represents a key-value store based on a dense and regular array.
void partition_batch(const Partitioner::PartitionBatchArguments &args) const
const ArrayOffset kMaxArrayOffset
The maximum value allowed for ArrayOffset.
uint32_t logs_count_
number of entries to process.
uint64_t ArrayOffset
The only key type in array storage.
bool is_partitionable() const
uint32_t BufferPosition
Represents a position in some buffer.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
double elapsed_ms() const
PartitionId bucket_owners_[kInteriorFanout]
partition of each bucket.
ArrayOffset array_size_
Size of the entire array.
Brings error stacktrace information as return value of functions.
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Represents one data page in Array Storage.
uint8_t get_array_levels() const
uint32_t get_ordinal() const __attribute__((always_inline))
const PartitionId * get_bucket_owners() const
ArrayOffset bucket_size_
bucket = offset / bucket_size_.
The pre-calculated p-m pair for optimized integer division by constant.
Declares common log types used in all packages.
Partitioner for an array storage.
0x0023 : foedus::storage::array::ArrayIncrementLogType .
0x0022 : foedus::storage::array::ArrayOverwriteLogType .
const EngineOptions & get_options() const
ArrayOffset get_array_size() const
Returns the size of this array.
uint32_t compact_logs(const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
A base class for ArrayOverwriteLogType/ArrayIncrementLogType.
snapshot::BufferPosition * output_buffer_
sorted results are written to this variable.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
void sort_batch(const Partitioner::SortBatchArguments &args) const
VolatilePagePointer volatile_pointer_
void prepare_sort_entries(const Partitioner::SortBatchArguments &args, SortEntry *entries)
subroutine of sort_batch
Log type of array-storage's overwrite operation.
const DualPagePointer & get_interior_record(uint16_t record) const __attribute__((always_inline))
bool exists() const
Returns whether this storage is already created.
void merge(const ArrayIncrementLogType &other) __attribute__((always_inline))
A special optimization for increment logs in log gleaner.
uint64_t stop()
Take another current time tick.
SnapshotPagePointer snapshot_pointer_
Epoch get_epoch() const __attribute__((always_inline))
Shared data of this storage type.
const snapshot::BufferPosition * log_positions_
positions of log records.
Auto-lock scope object for SharedMutex.
Log type of array-storage's increment operation.
void * get_block() const
Returns the memory block.
std::ostream & operator<<(std::ostream &o, const ArrayCreateLogType &v)
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
uint16_t group_count_
Number of ThreadGroup in the engine.
void set(ArrayOffset offset, uint16_t compressed_epoch, uint32_t in_epoch_ordinal, snapshot::BufferPosition position) __attribute__((always_inline))
uint8_t get_levels() const
Returns the number of levels.
thread::ThreadOptions thread_
const ErrorStack kRetOk
Normal return value for no-error case.
memory::AlignedMemory * work_memory_
Working memory to be used in this method.
Partitioning and sorting logic for one storage.
Arguments for sort_batch()
ArrayOffset get_offset() const __attribute__((always_inline))
ArrayPartitioner(Partitioner *parent)
const snapshot::BufferPosition * log_positions_
positions of log records.
Base class for log type of record-wise operation.
#define UNLIKELY(x)
Hints that x is highly likely false.
const uint16_t kInteriorFanout
Max number of entries in an interior page of array storage.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint8_t get_numa_node() const
A high-resolution stop watch.
uint64_t div64(uint64_t n) const
64-bit integer division that outputs both quotient and remainder.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.
CONTROL_BLOCK * get_control_block() const
bool is_initialized() const
ErrorStack design_partition(const Partitioner::DesignPartitionArguments &args)
const snapshot::LogBuffer & log_buffer_
Converts from positions to physical pointers.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
log::RecordLogType * resolve(BufferPosition position) const
Arguments for design_partition()
snapshot::BufferPosition get_position() const __attribute__((always_inline))
Arguments for partition_batch()