20 #include <glog/logging.h>
59 buffer_count_ = pages_to_read;
83 uint32_t buffer_size = total_size / inputs;
91 uint16_t root_child_index,
95 uint32_t* writer_buffer_pos,
96 uint32_t* writer_higher_buffer_pos,
97 bool* had_any_change) {
116 inputs_[i].
init(fileset, head_page_id, total_pages, buffer_piece_size, buffer_piece);
130 initial_bin = std::min<HashBin>(initial_bin, input->
get_cur_bin().
bin_);
135 *had_any_change =
false;
137 *had_any_change =
true;
144 writer_higher_buffer_pos));
151 uint32_t* installed_count,
159 uint32_t installed_diff = 0;
170 *next_lowest_bin = std::min<HashBin>(*next_lowest_bin, entry.
bin_);
174 uint16_t index = entry.
bin_ - cur_range.
begin_;
187 *installed_count += installed_diff;
195 uint32_t* writer_buffer_pos,
196 uint32_t* writer_higher_buffer_pos) {
208 for (valid_upto = 1U; valid_upto + 2U <
levels_; ++valid_upto) {
209 if (
cur_path_[valid_upto]->get_bin_range().contains(lowest_bin)) {
222 writer_higher_buffer_pos);
227 uint8_t fixed_upto_level,
230 uint32_t* writer_buffer_pos,
231 uint32_t* writer_higher_buffer_pos) {
246 for (uint8_t parent_level = fixed_upto_level; parent_level > 0; --parent_level) {
247 uint8_t level = parent_level - 1U;
252 const uint16_t index = route.
route[parent_level];
260 new_page_id = *writer_higher_buffer_pos;
261 new_page = higher_base + new_page_id;
262 ++(*writer_higher_buffer_pos);
267 new_page = main_base + *writer_buffer_pos;
268 ++(*writer_buffer_pos);
273 if (old_page_id == 0) {
292 uint32_t* writer_buffer_pos,
293 uint32_t writer_higher_buffer_pos) {
294 ASSERT_ND(*writer_buffer_pos <= writer->get_page_size());
295 ASSERT_ND(writer_higher_buffer_pos <= writer->get_intermediate_size());
298 *writer_buffer_pos = 0;
302 ASSERT_ND(writer_higher_buffer_pos + levels_ < writer->get_intermediate_size());
Represents an output of composer on one bin.
SnapshotPagePointer head_page_id_
Page ID of the head of HashComposedBinsPage for this sub-tree.
snapshot::SnapshotId snapshot_id_
ErrorCode assure_writer_buffer(snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t writer_higher_buffer_pos)
Subroutine to flush the writer if needed to make sure it has enough room.
Represents a pointer to another page (usually a child page).
uint16_t cursor_bin_count_
Number of active bins in the current page.
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
uint32_t StorageId
Unique ID for storage.
const HashBinRange & get_bin_range() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
uint32_t total_pages_
Number of HashComposedBinsPage from head_page_id_ contiguously emit by a composer.
uint32_t input_count_
total number of buffers in inputs_
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
Brings error stacktrace information as return value of functions.
Abstracts how we batch-read several HashComposedBinsPage emit from individual composers.
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Holds a set of read-only file objects for snapshot files.
PagePtr cur_path_[kHashMaxLevels]
The pages we are now composing.
SnapshotId get_snapshot_id() const
bool contains(HashBin hash) const
uint32_t buffer_size_
How many pages buffer_ can hold.
VolatilePagePointer volatile_pointer_
void init(cache::SnapshotFileSet *fileset, SnapshotPagePointer head_page_id, uint32_t total_pages, uint32_t buffer_size, HashComposedBinsPage *buffer)
ErrorCode process_a_bin(uint32_t *installed_count, HashBin *next_lowest_bin)
Consumes inputs for the cur_path_[0] page and install snapshot pointers there.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
ComposedBinsBuffer * inputs_
ErrorCode next_pages()
Read pages to buffer_.
SnapshotPagePointer snapshot_pointer_
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Represents a range of hash bins in a hash storage, such as what an intermediate page is responsible f...
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
storage::Page * get_page_base() __attribute__((always_inline))
HashBin begin_
Inclusive beginning of the range.
uint32_t buffer_pos_
index (0=head, total_pages_ - 1=tail, ) of the first page in the buffer_.
SnapshotPagePointer page_id_
HashBin end_
Exclusive end of the range.
ErrorCode read_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, void *out)
Read contiguous pages in one shot.
void * get_block() const
Returns the memory block.
DualPagePointer * get_pointer_address(uint16_t index)
std::unique_ptr< ComposedBinsBuffer[] > inputs_memory_
just for auto release
uint32_t buffer_count_
number of pages so far read in the buffer_.
ErrorCode next_bin() __attribute__((always_inline))
Moves on to next bin.
uint64_t get_size() const
Returns the byte size of the memory block.
Represents an intermediate page in Hashtable Storage.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, uint8_t level, HashBin start_bin)
storage::Page * get_intermediate_base() __attribute__((always_inline))
HashComposedBinsPage * buffer_
The buffer to read contiguous pages in one shot.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
uint64_t HashBin
Represents a bin of a hash value.
uint16_t cursor_bin_
Cursor position for bin in the current page.
ErrorCode open_path(HashBin bin, uint8_t fixed_upto_level, cache::SnapshotFileSet *fileset, snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t *writer_higher_buffer_pos)
Recursively opens pages down from fixed_upto_level.
Represents one memory block aligned to actual OS/hardware pages.
const ErrorStack kRetOk
Normal return value for no-error case.
cache::SnapshotFileSet * fileset_
file handles
ErrorCode switch_path(HashBin lowest_bin, cache::SnapshotFileSet *fileset, snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t *writer_higher_buffer_pos)
Moves cur_path_ to a page that cotains the specified bin.
uint8_t get_level() const
const HashBin kInvalidHashBin
This value or larger never appears as a valid HashBin.
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
static void assure_read_buffer_size(memory::AlignedMemory *read_buffer, uint32_t inputs)
If needed, expand the given read buffer to be used with the inputs.
ErrorStack init(const HashRootInfoPage *const *inputs, uint32_t input_count, PagePtr root_page, uint16_t root_child_index, memory::AlignedMemory *read_buffer, cache::SnapshotFileSet *fileset, snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t *writer_higher_buffer_pos, bool *had_any_change)
DualPagePointer & get_pointer(uint16_t index)
storage::SnapshotPagePointer get_next_page_id() const
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
ErrorCode
Enum of error codes defined in error_code.xmacro.
uint32_t cursor_buffer_
Cursor position for page in the buffer.
const ComposedBin & get_cur_bin() const __attribute__((always_inline))
A page to pack many ComposedBin as an output of composer.
Writes out one snapshot file for all data pages in one reducer.