libfoedus-core
FOEDUS Core Library
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
hash_composed_bins_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 
27 
28 namespace foedus {
29 namespace storage {
30 namespace hash {
31 
32 
34  cache::SnapshotFileSet* fileset,
35  SnapshotPagePointer head_page_id,
36  uint32_t total_pages,
37  uint32_t buffer_size,
38  HashComposedBinsPage* buffer) {
39  fileset_ = fileset;
40  head_page_id_ = head_page_id;
41  total_pages_ = total_pages;
42  buffer_size_ = buffer_size;
43  buffer_pos_ = 0;
44  buffer_count_ = 0;
45  cursor_buffer_ = 0;
46  cursor_bin_ = 0;
48  buffer_ = buffer;
49 }
50 
54  uint32_t previous_end = buffer_pos_ + buffer_count_;
55  uint32_t pages_to_read = std::min<uint32_t>(buffer_size_, total_pages_ - previous_end);
56  SnapshotPagePointer read_from = head_page_id_ + previous_end;
57  CHECK_ERROR_CODE(fileset_->read_pages(read_from, pages_to_read, buffer_));
58  buffer_pos_ = previous_end;
59  buffer_count_ = pages_to_read;
60  cursor_buffer_ = 0;
61  cursor_bin_ = 0;
64  } else {
66  buffer_count_ = 0;
67  cursor_buffer_ = 0;
68  cursor_bin_ = 0;
70  }
71  return kErrorCodeOk;
72 }
73 
75  memory::AlignedMemory* read_buffer,
76  uint32_t inputs) {
77  // split read_buffer_ to each input
78  read_buffer->assure_capacity(
79  kPageSize * kMinBufferSize * inputs,
80  2.0,
81  false);
82  uint32_t total_size = read_buffer->get_size() / kPageSize;
83  uint32_t buffer_size = total_size / inputs;
84  ASSERT_ND(buffer_size >= kMinBufferSize);
85 }
86 
88  const HashRootInfoPage* const* inputs,
89  uint32_t input_count,
90  PagePtr root_page,
91  uint16_t root_child_index,
92  memory::AlignedMemory* read_buffer,
93  cache::SnapshotFileSet* fileset,
95  uint32_t* writer_buffer_pos,
96  uint32_t* writer_higher_buffer_pos,
97  bool* had_any_change) {
98  ASSERT_ND(root_page->get_level() >= 1U);
99  snapshot_id_ = writer->get_snapshot_id();
100  std::memset(cur_path_, 0, sizeof(cur_path_));
101 
102  inputs_memory_.reset(new ComposedBinsBuffer[input_count]);
103  inputs_ = inputs_memory_.get();
104  input_count_ = input_count;
105 
106  ComposedBinsBuffer::assure_read_buffer_size(read_buffer, input_count);
107  uint64_t buffer_piece_size = read_buffer->get_size() / kPageSize / input_count;
108  for (uint32_t i = 0; i < input_count_; ++i) {
109  const DualPagePointer& root_child = inputs[i]->get_pointer(root_child_index);
110  SnapshotPagePointer head_page_id = root_child.snapshot_pointer_;
111  uint32_t total_pages = root_child.volatile_pointer_.word;
112  ASSERT_ND(total_pages > 0);
113  HashComposedBinsPage* read_buffer_casted
114  = reinterpret_cast<HashComposedBinsPage*>(read_buffer->get_block());
115  HashComposedBinsPage* buffer_piece = read_buffer_casted + buffer_piece_size * i;
116  inputs_[i].init(fileset, head_page_id, total_pages, buffer_piece_size, buffer_piece);
117  if (inputs_[i].has_more()) {
118  WRAP_ERROR_CODE(inputs_[i].next_pages());
119  }
120  }
121 
122  levels_ = root_page->get_level() + 1U;
123  cur_path_[root_page->get_level()] = root_page;
124 
125  // what's the bin we initially seek to?
126  HashBin initial_bin = kInvalidHashBin;
127  for (uint32_t i = 0; i < input_count_; ++i) {
128  ComposedBinsBuffer* input = inputs_ + i;
129  if (input->has_more()) {
130  initial_bin = std::min<HashBin>(initial_bin, input->get_cur_bin().bin_);
131  }
132  }
133 
134  if (initial_bin == kInvalidHashBin) {
135  *had_any_change = false;
136  } else {
137  *had_any_change = true;
139  initial_bin,
140  root_page->get_level(),
141  fileset,
142  writer,
143  writer_buffer_pos,
144  writer_higher_buffer_pos));
145  }
146 
147  return kRetOk;
148 }
149 
151  uint32_t* installed_count,
152  HashBin* next_lowest_bin) {
153  HashIntermediatePage* const page = cur_path_[0];
154  ASSERT_ND(page);
155  const HashBinRange& cur_range = page->get_bin_range();
156  ASSERT_ND(page->get_level() == 0);
157  ASSERT_ND(page->header().snapshot_);
158  *next_lowest_bin = kInvalidHashBin;
159  uint32_t installed_diff = 0;
160 
161  for (uint32_t i = 0; i < input_count_; ++i) {
162  ComposedBinsBuffer* input = inputs_ + i;
163  while (input->has_more()) {
164  const ComposedBin& entry = input->get_cur_bin();
165  ASSERT_ND(cur_range.begin_ <= entry.bin_);
166  ASSERT_ND(entry.page_id_ != 0);
168  if (entry.bin_ >= cur_range.end_) {
169  // no more bins for this page from this input
170  *next_lowest_bin = std::min<HashBin>(*next_lowest_bin, entry.bin_);
171  break;
172  }
173 
174  uint16_t index = entry.bin_ - cur_range.begin_;
176  DualPagePointer* target = page->get_pointer_address(index);
179  != snapshot_id_);
180  target->snapshot_pointer_ = entry.page_id_;
181  ++installed_diff;
182 
183  CHECK_ERROR_CODE(input->next_bin());
184  }
185  }
186 
187  *installed_count += installed_diff;
188  return kErrorCodeOk;
189 }
190 
192  HashBin lowest_bin,
193  cache::SnapshotFileSet* fileset,
194  snapshot::SnapshotWriter* writer,
195  uint32_t* writer_buffer_pos,
196  uint32_t* writer_higher_buffer_pos) {
197  ASSERT_ND(lowest_bin != kInvalidHashBin);
198  // because a path in this sub-tree must be switched, this is at least 3-levels.
199  // levels_==1 : single-level hash, separately processed in construct_root
200  // levels_==2 : then each sub-tree has only one level-0 page. switch_path is never called.
201  ASSERT_ND(levels_ >= 3U);
202  ASSERT_ND(!cur_path_[0]->get_bin_range().contains(lowest_bin));
203  ASSERT_ND(!cur_path_[levels_ - 1U]->get_bin_range().contains(lowest_bin)); // root page
204  ASSERT_ND(!cur_path_[levels_ - 2U]->get_bin_range().contains(lowest_bin)); // root_child page
205 
206  // where do we have to switch?
207  uint8_t valid_upto;
208  for (valid_upto = 1U; valid_upto + 2U < levels_; ++valid_upto) {
209  if (cur_path_[valid_upto]->get_bin_range().contains(lowest_bin)) {
210  break;
211  }
212  }
213  ASSERT_ND(cur_path_[valid_upto]->get_bin_range().contains(lowest_bin));
214  ASSERT_ND(!cur_path_[valid_upto - 1U]->get_bin_range().contains(lowest_bin));
215 
216  return open_path(
217  lowest_bin,
218  valid_upto,
219  fileset,
220  writer,
221  writer_buffer_pos,
222  writer_higher_buffer_pos);
223 }
224 
226  HashBin bin,
227  uint8_t fixed_upto_level,
228  cache::SnapshotFileSet* fileset,
229  snapshot::SnapshotWriter* writer,
230  uint32_t* writer_buffer_pos,
231  uint32_t* writer_higher_buffer_pos) {
232  ASSERT_ND(bin != kInvalidHashBin);
233  ASSERT_ND(cur_path_[fixed_upto_level]->get_bin_range().contains(bin));
234  ASSERT_ND(fixed_upto_level < levels_);
235 
236  // we need a room for one-page in main buffer, and levels pages in higher-level buffer.
237  CHECK_ERROR_CODE(assure_writer_buffer(writer, writer_buffer_pos, *writer_higher_buffer_pos));
238  PagePtr higher_base = reinterpret_cast<PagePtr>(writer->get_intermediate_base());
239  PagePtr main_base = reinterpret_cast<PagePtr>(writer->get_page_base());
240 
242  ASSERT_ND(route.route[levels_] == 0);
243  const StorageId storage_id = cur_path_[levels_ - 1U]->header().storage_id_;
244 
245  // let's open the page that contains the bin. higher levels first.
246  for (uint8_t parent_level = fixed_upto_level; parent_level > 0; --parent_level) {
247  uint8_t level = parent_level - 1U;
248  PagePtr parent = cur_path_[parent_level];
249  ASSERT_ND(parent->get_level() == parent_level);
250  ASSERT_ND(parent->get_bin_range().contains(bin));
251 
252  const uint16_t index = route.route[parent_level];
253  HashBin range_begin = parent->get_bin_range().begin_ + index * kHashMaxBins[parent_level];
254  ASSERT_ND(parent->get_bin_range().length() == kHashMaxBins[parent_level + 1U]);
255 
256  SnapshotPagePointer old_page_id = parent->get_pointer(index).snapshot_pointer_;
257  SnapshotPagePointer new_page_id;
258  PagePtr new_page;
259  if (level > 0) {
260  new_page_id = *writer_higher_buffer_pos;
261  new_page = higher_base + new_page_id;
262  ++(*writer_higher_buffer_pos);
263  ASSERT_ND((*writer_higher_buffer_pos) <= writer->get_intermediate_size());
264  } else {
265  // Unlike higher-levels, we can finalize the page ID for level-0 pages.
266  new_page_id = writer->get_next_page_id() + (*writer_buffer_pos);
267  new_page = main_base + *writer_buffer_pos;
268  ++(*writer_buffer_pos);
269  ASSERT_ND((*writer_buffer_pos) <= writer->get_page_size());
270  }
271  cur_path_[level] = new_page;
272 
273  if (old_page_id == 0) {
274  // the page didn't exist before, create a new one.
275  new_page->initialize_snapshot_page(storage_id, new_page_id, level, range_begin);
276  } else {
277  // otherwise, start from the previous page image.
279  CHECK_ERROR_CODE(fileset->read_page(old_page_id, new_page));
280  ASSERT_ND(new_page->get_bin_range().begin_ == range_begin);
281  ASSERT_ND(new_page->get_level() == level);
282  new_page->header().page_id_ = new_page_id;
283  }
284  parent->get_pointer(index).snapshot_pointer_ = new_page_id;
285  }
286 
287  return kErrorCodeOk;
288 }
289 
291  snapshot::SnapshotWriter* writer,
292  uint32_t* writer_buffer_pos,
293  uint32_t writer_higher_buffer_pos) {
294  ASSERT_ND(*writer_buffer_pos <= writer->get_page_size());
295  ASSERT_ND(writer_higher_buffer_pos <= writer->get_intermediate_size());
296  if (UNLIKELY(*writer_buffer_pos == writer->get_page_size())) {
297  CHECK_ERROR_CODE(writer->dump_pages(0, *writer_buffer_pos));
298  *writer_buffer_pos = 0;
299  }
300  if (UNLIKELY(writer_higher_buffer_pos + levels_ >= writer->get_intermediate_size())) {
301  CHECK_ERROR_CODE(writer->expand_intermediate_memory(writer_higher_buffer_pos + levels_, true));
302  ASSERT_ND(writer_higher_buffer_pos + levels_ < writer->get_intermediate_size());
303  }
304  return kErrorCodeOk;
305 }
306 
307 } // namespace hash
308 } // namespace storage
309 } // namespace foedus
Represents an output of composer on one bin.
SnapshotPagePointer head_page_id_
Page ID of the head of HashComposedBinsPage for this sub-tree.
ErrorCode assure_writer_buffer(snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t writer_higher_buffer_pos)
Subroutine to flush the writer if needed to make sure it has enough room.
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
uint16_t cursor_bin_count_
Number of active bins in the current page.
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
ErrorCode expand_intermediate_memory(uint32_t required_pages, bool retain_content)
Expands intermediate_memory_ in case it is too small.
uint32_t StorageId
Unique ID for storage.
Definition: storage_id.hpp:55
const HashBinRange & get_bin_range() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
uint32_t total_pages_
Number of HashComposedBinsPage from head_page_id_ contiguously emit by a composer.
uint32_t input_count_
total number of buffers in inputs_
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Abstracts how we batch-read several HashComposedBinsPage emit from individual composers.
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
Holds a set of read-only file objects for snapshot files.
PagePtr cur_path_[kHashMaxLevels]
The pages we are now composing.
bool contains(HashBin hash) const
Definition: hash_id.hpp:176
Compactly represents the route of intermediate pages to reach the given hash bin. ...
uint32_t buffer_size_
How many pages buffer_ can hold.
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
0 means no-error.
Definition: error_code.hpp:87
void init(cache::SnapshotFileSet *fileset, SnapshotPagePointer head_page_id, uint32_t total_pages, uint32_t buffer_size, HashComposedBinsPage *buffer)
ErrorCode process_a_bin(uint32_t *installed_count, HashBin *next_lowest_bin)
Consumes inputs for the cur_path_[0] page and install snapshot pointers there.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
ErrorCode next_pages()
Read pages to buffer_.
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
memory::PagePoolOffset get_intermediate_size() const __attribute__((always_inline))
Represents a range of hash bins in a hash storage, such as what an intermediate page is responsible f...
Definition: hash_id.hpp:172
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
Definition: storage_id.hpp:98
uint8_t route[8]
[0] means ordinal in level-0 intermediate page, [1] in its parent page, [2]...
storage::Page * get_page_base() __attribute__((always_inline))
HashBin begin_
Inclusive beginning of the range.
Definition: hash_id.hpp:191
uint32_t buffer_pos_
index (0=head, total_pages_ - 1=tail, ) of the first page in the buffer_.
HashBin end_
Exclusive end of the range.
Definition: hash_id.hpp:193
ErrorCode read_pages(storage::SnapshotPagePointer page_id_begin, uint32_t page_count, void *out)
Read contiguous pages in one shot.
void * get_block() const
Returns the memory block.
DualPagePointer * get_pointer_address(uint16_t index)
std::unique_ptr< ComposedBinsBuffer[] > inputs_memory_
just for auto release
uint32_t buffer_count_
number of pages so far read in the buffer_.
ErrorCode next_bin() __attribute__((always_inline))
Moves on to next bin.
uint64_t get_size() const
Returns the byte size of the memory block.
Represents an intermediate page in Hashtable Storage.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id, uint8_t level, HashBin start_bin)
storage::Page * get_intermediate_base() __attribute__((always_inline))
HashComposedBinsPage * buffer_
The buffer to read contiguous pages in one shot.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
uint64_t HashBin
Represents a bin of a hash value.
Definition: hash_id.hpp:142
uint16_t cursor_bin_
Cursor position for bin in the current page.
ErrorCode open_path(HashBin bin, uint8_t fixed_upto_level, cache::SnapshotFileSet *fileset, snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t *writer_higher_buffer_pos)
Recursively opens pages down from fixed_upto_level.
Represents one memory block aligned to actual OS/hardware pages.
const ErrorStack kRetOk
Normal return value for no-error case.
cache::SnapshotFileSet * fileset_
file handles
ErrorCode switch_path(HashBin lowest_bin, cache::SnapshotFileSet *fileset, snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t *writer_higher_buffer_pos)
Moves cur_path_ to a page that cotains the specified bin.
const HashBin kInvalidHashBin
This value or larger never appears as a valid HashBin.
Definition: hash_id.hpp:162
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
Definition: hash_id.hpp:49
const uint64_t kHashMaxBins[]
kHashTotalBins[n] gives the maximum number of hash bins n-level hash can hold.
Definition: hash_id.hpp:74
static IntermediateRoute construct(HashBin bin)
Calculates the rout for the given hash bin.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
static void assure_read_buffer_size(memory::AlignedMemory *read_buffer, uint32_t inputs)
If needed, expand the given read buffer to be used with the inputs.
ErrorStack init(const HashRootInfoPage *const *inputs, uint32_t input_count, PagePtr root_page, uint16_t root_child_index, memory::AlignedMemory *read_buffer, cache::SnapshotFileSet *fileset, snapshot::SnapshotWriter *writer, uint32_t *writer_buffer_pos, uint32_t *writer_higher_buffer_pos, bool *had_any_change)
DualPagePointer & get_pointer(uint16_t index)
storage::SnapshotPagePointer get_next_page_id() const
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
uint32_t cursor_buffer_
Cursor position for page in the buffer.
const ComposedBin & get_cur_bin() const __attribute__((always_inline))
A page to pack many ComposedBin as an output of composer.
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
Writes out one snapshot file for all data pages in one reducer.