libfoedus-core
FOEDUS Core Library
hash_reserve_impl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include "foedus/assert_nd.hpp"
25 #include "foedus/thread/thread.hpp"
26 
27 namespace foedus {
28 namespace storage {
29 namespace hash {
30 
33  return find_or_create_or_expand(sysxct_workspace, target_, hint_check_from_);
34 }
35 
37  xct::SysxctWorkspace* sysxct_workspace,
38  HashDataPage* page,
39  DataPageSlotIndex examined_records) {
40  ASSERT_ND(!page->header().snapshot_);
41  // lock the page first so that there is no race on new keys.
42  VolatilePagePointer next_pointer = page->next_page().volatile_pointer_;
43  if (next_pointer.is_null()) {
44  CHECK_ERROR_CODE(context_->sysxct_page_lock(sysxct_workspace, reinterpret_cast<Page*>(page)));
45  next_pointer = page->next_page().volatile_pointer_;
46  } else {
47  // If the page already has a next-page pointer, the page is already finalized. Skip locking.
48  assorted::memory_fence_acquire(); // must check next-page BEFORE everything else
49  }
50  // In either case, from now on this page is finalized
51  ASSERT_ND(page->is_locked() || !next_pointer.is_null());
53  ASSERT_ND(examined_records <= count);
54 
55  // Locate the key in this page.
56  DataPageSlotIndex index;
57  if (examined_records == count) {
58 #ifndef NDEBUG
59  // is examined_records correct? let's confirm
60  index = search_within_page(page, examined_records, 0);
61  ASSERT_ND(index == kSlotNotFound);
62 #endif // NDEBUG
63  index = kSlotNotFound;
64  } else {
65  // We can skip the first examined_records records.
66  // We took the page lock, so physical-only search is enough.
67  index = search_within_page(page, count, examined_records);
68  }
69 
70  if (index == kSlotNotFound) {
71  // still no match, This is simpler.
72  if (!next_pointer.is_null()) {
73  // We have a next page, let's go on.
74  // This is so far a recursion. Easier to debug than a loop.
75  // Stackoverflow? If you have a long chain in hash bucket, you are already screwed
76  HashDataPage* next_page = context_->resolve_cast<HashDataPage>(next_pointer);
77  return find_or_create_or_expand(sysxct_workspace, next_page, 0);
78  } else {
79  // This is the tail page. and we locked it.
80  // We simply create a new physical record.
81  ASSERT_ND(page->is_locked());
82  return create_new_record_in_tail_page(page);
83  }
84  } else {
85  // We found a match! This is either super easy or complex
86  ASSERT_ND(index >= examined_records);
87  ASSERT_ND(index < count);
88  if (page->get_slot(index).get_max_payload() >= payload_count_) {
89  // This record is enough spacious. we are done! a super easy case.
90  out_slot_ = index;
91  out_page_ = page;
92  return kErrorCodeOk;
93  } else {
94  // We need to expand this record. To do that, we need to lock this record and move it.
95  // This can be complex.
96  return expand_record(sysxct_workspace, page, index);
97  }
98  }
99 }
100 
102  xct::SysxctWorkspace* sysxct_workspace,
103  HashDataPage* page,
104  DataPageSlotIndex index) {
105  auto* record = &page->get_slot(index).tid_;
107  sysxct_workspace,
108  page->get_volatile_page_id(),
109  record));
110  // After locking, now the record status is finalized.
111  if (record->is_moved()) {
112  // If we now find it moved, we have two choices.
113  // 1) resume the search. the new location should be somewhere after here.
114  // 2) retry the whole sysxct.
115  // We do 2) to save coding (yikes!). Excuse: This case should be rare.
116  LOG(INFO) << "Rare. The record turns out to be moved after locking. Retry the sysxct";
117  return kErrorCodeXctRaceAbort;
118  }
119 
120  // Now, because we locked a record of the key, and because it wasn't moved yet,
121  // we are sure that this is the only record in this hash bucket that might contain this key.
122  // We'll be done as soon as we figure out where to move this record.
123  HashDataPage* tail;
124  CHECK_ERROR_CODE(find_and_lock_spacious_tail(sysxct_workspace, page, &tail));
125  ASSERT_ND(tail->is_locked());
126  ASSERT_ND(tail->available_space()
128 
129  // Copy the XID of the existing record
130  out_slot_ = append_record_to_page(tail, record->xct_id_);
131  out_page_ = tail;
132 
133  // Now we mark the old record as moved
135  record->xct_id_.set_moved();
136  return kErrorCodeOk;
137 }
138 
140  xct::SysxctWorkspace* sysxct_workspace,
141  HashDataPage* from_page,
142  HashDataPage** tail) {
143  *tail = nullptr;
144  HashDataPage* cur = from_page;
145  while (true) {
146  VolatilePagePointer next_pointer = cur->next_page().volatile_pointer_;
147  if (!next_pointer.is_null()) {
148  cur = context_->resolve_cast<HashDataPage>(next_pointer);
149  continue;
150  }
151  // cur looks like a tail page, but we need to confirm that after locking.
152  CHECK_ERROR_CODE(context_->sysxct_page_lock(sysxct_workspace, reinterpret_cast<Page*>(cur)));
153  if (!cur->next_page().volatile_pointer_.is_null()) {
154  LOG(INFO) << "Rare. Someone has just made a next page";
155  continue;
156  }
157 
158  const uint16_t available_space = cur->available_space();
159  const uint16_t required_space
161  if (available_space >= required_space) {
162  *tail = cur;
163  return kErrorCodeOk;
164  } else {
165  // need to make a new page..
166  HashDataPage* new_tail;
167  CHECK_ERROR_CODE(create_new_tail_page(cur, &new_tail));
168 
169  // Then, after all, we "publish" this new page
170  assorted::memory_fence_release(); // so that others don't see uninitialized page
171  cur->next_page().volatile_pointer_ = new_tail->get_volatile_page_id();
172  assorted::memory_fence_release(); // so that others don't have "where's the next page" issue
174 
175  // We could immediately make a record in this page before the publish above.
176  // If we do that we could avoid lock here. But the code becomes a bit uglier.
177  // record-expand is anyway a costly operation, so ok for now.
179  sysxct_workspace,
180  reinterpret_cast<Page*>(new_tail)));
181  *tail = new_tail;
182  return kErrorCodeOk;
183  }
184  }
185 }
186 
188  HashDataPage* cur_tail,
189  HashDataPage** new_tail) {
190  ASSERT_ND(cur_tail->is_locked());
191  DVLOG(2) << "Volatile HashDataPage is full. Adding a next page..";
192  const VolatilePagePointer new_pointer
194  if (UNLIKELY(new_pointer.is_null())) {
196  }
197 
198  *new_tail = context_->resolve_newpage_cast<HashDataPage>(new_pointer.get_offset());
199  (*new_tail)->initialize_volatile_page(
200  cur_tail->header().storage_id_,
201  new_pointer,
202  reinterpret_cast<Page*>(cur_tail),
203  cur_tail->get_bin(),
204  cur_tail->get_bin_shifts());
205  return kErrorCodeOk;
206 }
207 
209  ASSERT_ND(tail->is_locked());
211 
212  // do we have enough room in this page?
213  const uint16_t available_space = tail->available_space();
214  const uint16_t required_space
216  xct::XctId initial_xid;
217  initial_xid.set(
218  Epoch::kEpochInitialCurrent, // TODO(Hideaki) this should be something else
219  0);
220  initial_xid.set_deleted();
221  if (available_space < required_space) {
222  // This page can't hold it. Let's make a new page.
223  HashDataPage* new_tail;
224  CHECK_ERROR_CODE(create_new_tail_page(tail, &new_tail));
225 
226  // Then append a record to it.
227  out_slot_ = append_record_to_page(new_tail, initial_xid);
228  ASSERT_ND(out_slot_ == 0); // it's the first record in the new page
229  out_page_ = new_tail;
230  // Then, after all, we "publish" this new page
231  assorted::memory_fence_release(); // so that others don't see uninitialized page
232  tail->next_page().volatile_pointer_ = new_tail->get_volatile_page_id();
233  assorted::memory_fence_release(); // so that others don't have "where's the next page" issue
235  } else {
236  // the page is enough spacious, and has no next page. we rule!
237  out_slot_ = append_record_to_page(tail, initial_xid);
238  out_page_ = tail;
239  }
240 
241  return kErrorCodeOk;
242 }
243 
245  const HashDataPage* page,
246  DataPageSlotIndex key_count,
247  DataPageSlotIndex examined_records) const {
248  ASSERT_ND(!page->header().snapshot_);
250  || !page->next_page().volatile_pointer_.is_null());
251  ASSERT_ND(key_count == page->get_record_count());
252  return page->search_key_physical(
253  combo_.hash_,
255  key_,
256  key_length_,
257  key_count,
258  examined_records);
259 }
260 
262  HashDataPage* page,
263  xct::XctId initial_xid) const {
264  ASSERT_ND(page->available_space()
266  DataPageSlotIndex index = page->get_record_count();
267  auto& slot = page->get_new_slot(index);
268  slot.offset_ = page->next_offset();
269  slot.hash_ = combo_.hash_;
270  slot.key_length_ = key_length_;
271  slot.physical_record_length_ = assorted::align8(key_length_) + assorted::align8(payload_count_);
272  slot.payload_length_ = 0;
273  char* record = page->record_from_offset(slot.offset_);
274  std::memcpy(record, key_, key_length_);
275  if (key_length_ % 8 != 0) {
276  std::memset(record + key_length_, 0, 8 - (key_length_ % 8));
277  }
278  slot.tid_.reset();
279  slot.tid_.xct_id_ = initial_xid;
280 
281  // we install the fingerprint to bloom filter BEFORE we increment key count.
282  // it's okay for concurrent reads to see false positives, but false negatives are wrong!
283  page->bloom_filter_.add(combo_.fingerprint_);
284 
285  // we increment key count AFTER installing the key because otherwise the optimistic read
286  // might see the record but find that the key doesn't match. we need a fence to prevent it.
288  page->header_.increment_key_count();
289  return index;
290 }
291 
292 } // namespace hash
293 } // namespace storage
294 } // namespace foedus
HashDataPage *const target_
The data page to install a new physical record.
T align8(T value)
8-alignment.
memory::NumaCoreMemory * get_thread_memory() const
Returns the private memory repository of this thread.
Definition: thread.cpp:57
The first epoch (before wrap-around) that might have transactions is ep-3.
Definition: epoch.hpp:77
uint16_t next_offset() const
Returns offset of a next record.
void set_deleted() __attribute__((always_inline))
Definition: xct_id.hpp:1027
ErrorCode find_and_lock_spacious_tail(xct::SysxctWorkspace *sysxct_workspace, HashDataPage *from_page, HashDataPage **tail)
xct::RwLockableXctId tid_
TID of the record.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ErrorCode sysxct_record_lock(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
Takes a lock for a sysxct running under this thread.
void increment_key_count() __attribute__((always_inline))
Definition: page.hpp:318
DataPageSlotIndex out_slot_
[Out] The slot of the record that is found or created.
const PayloadLength payload_count_
Minimal required length of the payload.
StorageId storage_id_
ID of the storage this page belongs to.
Definition: page.hpp:196
const DataPageSlotIndex kSlotNotFound
Definition: hash_id.hpp:197
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
ErrorCode find_or_create_or_expand(xct::SysxctWorkspace *sysxct_workspace, HashDataPage *page, DataPageSlotIndex examined_records)
The main loop (well, recursion actually).
HashDataPage * out_page_
[Out] The page that contains the found/created record.
storage::VolatilePagePointer grab_free_volatile_page_pointer()
Wrapper for grab_free_volatile_page().
const Slot & get_slot(DataPageSlotIndex record) const __attribute__((always_inline))
const void *const key_
The key of the new record.
void set(Epoch::EpochInteger epoch_int, uint32_t ordinal)
Definition: xct_id.hpp:958
void set_has_next_page() __attribute__((always_inline))
Definition: page.hpp:154
const KeyLength key_length_
Byte length of the key.
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
0 means no-error.
Definition: error_code.hpp:87
P * resolve_newpage_cast(storage::VolatilePagePointer ptr) const
Definition: thread.hpp:113
memory::PagePoolOffset get_offset() const
Definition: storage_id.hpp:202
ErrorCode sysxct_page_lock(xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
Takes a page lock in the same page for a sysxct running under this thread.
ErrorCode create_new_tail_page(HashDataPage *cur_tail, HashDataPage **new_tail)
ErrorCode create_new_record_in_tail_page(HashDataPage *tail)
Installs it as a fresh-new physical record, assuming the given page is the tail and already locked...
virtual ErrorCode run(xct::SysxctWorkspace *sysxct_workspace) override
Execute the system transaction.
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
PageVersion page_version_
Used in several storage types as concurrency control mechanism for the page.
Definition: page.hpp:272
uint16_t DataPageSlotIndex
Definition: hash_id.hpp:196
const DataPageSlotIndex hint_check_from_
The in-page location from which this sysxct will look for matching records.
P * resolve_cast(storage::VolatilePagePointer ptr) const
resolve() plus reinterpret_cast
Definition: thread.hpp:110
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
Slot & get_new_slot(DataPageSlotIndex record) __attribute__((always_inline))
BloomFilterFingerprint fingerprint_
Definition: hash_combo.hpp:51
thread::Thread *const context_
Thread context.
ErrorCode expand_record(xct::SysxctWorkspace *sysxct_workspace, HashDataPage *page, DataPageSlotIndex index)
static uint16_t required_space(uint16_t key_length, uint16_t payload_length)
returns physical_record_length_ for a new record
VolatilePagePointer get_volatile_page_id() const
char * record_from_offset(uint16_t offset)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
void add(const BloomFilterFingerprint &fingerprint) __attribute__((always_inline))
Adds the fingerprint to this bloom filter.
Represents an individual data page in Hashtable Storage.
uint16_t available_space() const
Returns usable space in bytes.
const DualPagePointer & next_page() const __attribute__((always_inline))
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id, const Page *parent, HashBin bin, uint8_t bin_shifts)
const PayloadLength aggressive_payload_count_hint_
When we expand the record or allocate a new record, we might allocate a larger-than-necessary space g...
DataPageSlotIndex append_record_to_page(HashDataPage *page, xct::XctId initial_xid) const
Appends a new physical record to the page.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
DataPageSlotIndex search_within_page(const HashDataPage *page, DataPageSlotIndex key_count, DataPageSlotIndex examined_records) const
uint16_t get_record_count() const __attribute__((always_inline))
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool is_locked() const __attribute__((always_inline))
Definition: page.hpp:138
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
DataPageSlotIndex search_key_physical(HashValue hash, const BloomFilterFingerprint &fingerprint, const void *key, KeyLength key_length, DataPageSlotIndex record_count, DataPageSlotIndex check_from=0) const
Search for a physical slot that exactly contains the given key.
Per-thread reused work memory for system transactions.
uint16_t offset_
Byte offset in data_ where this record starts.
const HashCombo & combo_
Hash info of the key.