libfoedus-core
FOEDUS Core Library
hash_tmpbin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <ostream>
23 #include <string>
24 
26 
27 namespace foedus {
28 namespace storage {
29 namespace hash {
30 
31 
33  : memory_(), records_capacity_(0), records_consumed_(0), buckets_(nullptr), records_(nullptr) {
34 }
35 
36 ErrorCode HashTmpBin::create_memory(uint16_t numa_node, uint64_t initial_size) {
37  ASSERT_ND(initial_size > sizeof(RecordIndex) * kBucketCount);
38  ASSERT_ND(sizeof(Record) == kPageSize);
39  memory_.alloc(
40  initial_size,
41  1ULL << 21,
43  numa_node);
44  on_memory_set();
45  clean();
46  return kErrorCodeOk;
47 }
48 
50  memory_.release_block();
51  on_memory_set();
52  clean();
53 }
54 
56  ASSERT_ND(!provider->is_null());
57  memory_ = std::move(*provider);
58  ASSERT_ND(provider->is_null());
59  ASSERT_ND(!memory_.is_null());
60  on_memory_set();
61  clean();
62 }
63 
65  ASSERT_ND(!memory_.is_null());
66  *recipient = std::move(memory_);
67  ASSERT_ND(!recipient->is_null());
68  ASSERT_ND(memory_.is_null());
69  on_memory_set();
70  clean();
71 }
72 
73 
75  records_consumed_ = get_first_record();
76  if (buckets_) {
77  std::memset(buckets_, 0, records_consumed_ * sizeof(Record));
78  }
79 }
80 
82  ASSERT_ND(buckets_);
83  ASSERT_ND(records_capacity_ > 0);
84  ASSERT_ND(records_consumed_ >= get_first_record());
85  // at some point, just mem-zero is faster. we could do a switch like below, but it's rare.
86  // this also makes unit-testing more tricky.
87  // if (records_consumed_ > get_first_record() + 256U) {
88  // clean();
89  // return;
90  // }
91  for (RecordIndex index = get_first_record(); index < records_consumed_; ++index) {
92  Record* record = get_record(index);
93  buckets_[extract_bucket(record->hash_)] = 0;
94  }
95  records_consumed_ = get_first_record();
96 }
97 
98 
99 void HashTmpBin::on_memory_set() {
100  if (memory_.is_null()) {
101  buckets_ = nullptr;
102  records_ = nullptr;
103  records_capacity_ = 0;
104  } else {
105  buckets_ = reinterpret_cast<RecordIndex*>(memory_.get_block());
106  records_ = reinterpret_cast<Record*>(memory_.get_block());
107  records_capacity_ = memory_.get_size() / sizeof(Record);
108  ASSERT_ND(sizeof(RecordIndex) * kBucketCount % sizeof(Record) == 0);
109  }
110 }
111 
112 inline ErrorCode HashTmpBin::alloc_record(RecordIndex* out) {
113  if (UNLIKELY(records_consumed_ == records_capacity_)) {
114  // expand memory with keeping the content, which is expensive! this mustn't happen often
115  LOG(INFO) << "We need to resize HashTmpBin! current_size=" << memory_.get_size();
116  CHECK_ERROR_CODE(memory_.assure_capacity(memory_.get_size() * 2, 2.0, true));
117  on_memory_set();
118  }
119 
120  ASSERT_ND(records_consumed_ < records_capacity_);
121  *out = records_consumed_;
122  ++records_consumed_;
123  return kErrorCodeOk;
124 }
125 
127  xct::XctId xct_id,
128  const void* key,
129  uint16_t key_length,
130  HashValue hash,
131  const void* payload,
132  uint16_t payload_length) {
133  ASSERT_ND(!xct_id.is_deleted());
134  ASSERT_ND(hashinate(key, key_length) == hash);
135  SearchResult result = search_bucket(key, key_length, hash);
136  if (result.found_ == 0) {
137  RecordIndex new_index;
138  CHECK_ERROR_CODE(alloc_record(&new_index));
139  Record* record = get_record(new_index);
140  record->set_all(xct_id, key, key_length, hash, payload, payload_length);
141 
142  if (result.tail_ == 0) {
143  uint16_t bucket_index = extract_bucket(hash);
144  ASSERT_ND(buckets_[bucket_index] == 0);
145  buckets_[bucket_index] = new_index;
146  } else {
147  ASSERT_ND(buckets_[extract_bucket(hash)] != 0);
148  Record* tail_record = get_record(result.tail_);
149  tail_record->next_ = new_index;
150  }
151  } else {
152  Record* record = get_record(result.found_);
153  ASSERT_ND(record->hash_ == hash);
154  if (UNLIKELY(!record->xct_id_.is_deleted())) {
155  DLOG(WARNING) << "HashTmpBin::insert_record() hit KeyAlreadyExists case. This must not"
156  << " happen except unit testcases.";
158  }
159  ASSERT_ND(record->xct_id_.compare_epoch_and_orginal(xct_id) < 0);
160  record->xct_id_ = xct_id;
161  record->set_payload(payload, payload_length);
162  }
163 
164  return kErrorCodeOk;
165 }
166 
168  xct::XctId xct_id,
169  const void* key,
170  uint16_t key_length,
171  HashValue hash) {
172  ASSERT_ND(xct_id.is_deleted());
173  ASSERT_ND(hashinate(key, key_length) == hash);
174  SearchResult result = search_bucket(key, key_length, hash);
175  if (UNLIKELY(result.found_ == 0)) {
176  DLOG(WARNING) << "HashTmpBin::delete_record() hit KeyNotFound case 1. This must not"
177  << " happen except unit testcases.";
179  } else {
180  Record* record = get_record(result.found_);
181  ASSERT_ND(record->hash_ == hash);
182  if (UNLIKELY(record->xct_id_.is_deleted())) {
183  DLOG(WARNING) << "HashTmpBin::delete_record() hit KeyNotFound case 2. This must not"
184  << " happen except unit testcases.";
186  }
187  ASSERT_ND(record->xct_id_.compare_epoch_and_orginal(xct_id) < 0);
188  record->xct_id_ = xct_id;
189  }
190 
191  return kErrorCodeOk;
192 }
193 
195  xct::XctId xct_id,
196  const void* key,
197  uint16_t key_length,
198  HashValue hash,
199  const void* payload,
200  uint16_t payload_offset,
201  uint16_t payload_count) {
202  ASSERT_ND(!xct_id.is_deleted());
203  ASSERT_ND(hashinate(key, key_length) == hash);
204  SearchResult result = search_bucket(key, key_length, hash);
205  if (UNLIKELY(result.found_ == 0)) {
206  DLOG(WARNING) << "HashTmpBin::overwrite_record() hit KeyNotFound case 1. This must not"
207  << " happen except unit testcases.";
209  } else {
210  Record* record = get_record(result.found_);
211  ASSERT_ND(record->hash_ == hash);
212  if (UNLIKELY(record->xct_id_.is_deleted())) {
213  DLOG(WARNING) << "HashTmpBin::overwrite_record() hit KeyNotFound case 2. This must not"
214  << " happen except unit testcases.";
216  } else if (UNLIKELY(record->payload_length_ < payload_offset + payload_count)) {
217  DLOG(WARNING) << "HashTmpBin::overwrite_record() hit TooShortPayload case. This must not"
218  << " happen except unit testcases.";
220  }
221  ASSERT_ND(record->xct_id_.compare_epoch_and_orginal(xct_id) < 0);
222  record->xct_id_ = xct_id;
223  record->overwrite_payload(payload, payload_offset, payload_count);
224  }
225 
226  return kErrorCodeOk;
227 }
228 
230  xct::XctId xct_id,
231  const void* key,
232  uint16_t key_length,
233  HashValue hash,
234  const void* payload,
235  uint16_t payload_length) {
236  ASSERT_ND(!xct_id.is_deleted());
237  ASSERT_ND(hashinate(key, key_length) == hash);
238  SearchResult result = search_bucket(key, key_length, hash);
239  if (UNLIKELY(result.found_ == 0)) {
240  DLOG(WARNING) << "HashTmpBin::update_record() hit KeyNotFound case 1. This must not"
241  << " happen except unit testcases.";
243  } else {
244  Record* record = get_record(result.found_);
245  ASSERT_ND(record->hash_ == hash);
246  if (UNLIKELY(record->xct_id_.is_deleted())) {
247  DLOG(WARNING) << "HashTmpBin::update_record() hit KeyNotFound case 2. This must not"
248  << " happen except unit testcases.";
250  }
251  ASSERT_ND(record->xct_id_.compare_epoch_and_orginal(xct_id) < 0);
252  record->xct_id_ = xct_id;
253  record->set_payload(payload, payload_length);
254  }
255 
256  return kErrorCodeOk;
257 }
258 
259 
260 inline HashTmpBin::SearchResult HashTmpBin::search_bucket(
261  const void* key,
262  uint16_t key_length,
263  HashValue hash) const {
264  uint16_t bucket_index = extract_bucket(hash);
265  RecordIndex head = buckets_[bucket_index];
266  if (head == 0) {
267  return SearchResult(0, 0);
268  }
269 
270  ASSERT_ND(head > 0);
271  RecordIndex last_seen = 0;
272  for (RecordIndex cur = head; cur != 0;) {
273  Record* record = get_record(cur);
274  if (record->hash_ == hash && record->key_length_ == key_length) {
275  if (LIKELY(std::memcmp(record->get_key(), key, key_length) == 0)) {
276  return SearchResult(cur, 0);
277  }
278  }
279 
280  last_seen = cur;
281  cur = record->next_;
282  }
283 
284  return SearchResult(0, last_seen);
285 }
286 
287 std::ostream& operator<<(std::ostream& o, const HashTmpBin& v) {
288  // Each bin shouldn't have that many records... so, output everything!
289  o << "<HashTmpBin>" << std::endl;
290  o << " " << v.memory_ << std::endl;
291  o << " <records_capacity_>" << v.records_capacity_ << "</records_capacity_>" << std::endl;
292  o << " <records_consumed_>" << v.records_consumed_ << "</records_consumed_>" << std::endl;
293  o << " <buckets_>" << std::endl;
294  for (uint32_t i = 0; i < HashTmpBin::kBucketCount; ++i) {
295  if (v.buckets_[i] != 0) {
296  o << " <bucket idx=\"" << i << "\" head_rec=\"" << v.buckets_[i] << "\" />" << std::endl;
297  }
298  }
299  o << " </buckets_>" << std::endl;
300  o << " <records_>" << std::endl;
301  uint32_t begin = v.get_first_record();
302  for (uint32_t i = begin; i < v.get_records_consumed(); ++i) {
303  HashTmpBin::Record* record = v.get_record(i);
304  o << " <record id=\"" << i << "\" hash=\"" << assorted::Hex(record->hash_, 16) << "\"";
305  if (record->next_) {
306  o << " next_rec=\"" << record->next_ << "\"";
307  }
308  o << ">";
309  o << "<key>" << assorted::HexString(std::string(record->get_key(), record->key_length_))
310  << "</key>";
311  o << "<payload>"
312  << assorted::HexString(std::string(record->get_payload(), record->payload_length_))
313  << "</payload>";
314  o << record->xct_id_;
315  o << "</record>" << std::endl;
316  }
317  o << " </records_>" << std::endl;
318  o << "</HashTmpBin>";
319  return o;
320 }
321 
322 
323 } // namespace hash
324 } // namespace storage
325 } // namespace foedus
0x080A : "STORAGE: The record's payload is smaller than requested" .
Definition: error_code.hpp:176
Represents one record in our key-value store.
Definition: record.hpp:33
numa_alloc_onnode() and numa_free().
0x080C : "STORAGE: This key is not found in this storage" .
Definition: error_code.hpp:178
ErrorCode create_memory(uint16_t numa_node, uint64_t initial_size=kDefaultInitialSize)
Allocates the memory to use by this object.
Definition: hash_tmpbin.cpp:36
void release_block()
Releases the memory block.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
uint32_t RecordIndex
Pointer to Record.
Definition: hash_tmpbin.hpp:69
void clean()
Removes all tuple data for the current bin.
Definition: hash_tmpbin.cpp:74
ErrorCode update_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length)
Updates a record of the given key with the given payload, which might change length.
void set_all(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length) __attribute__((always_inline))
Definition: hash_tmpbin.hpp:97
void steal_memory(memory::AlignedMemory *provider)
This is a special version of create_memory() where this object steals the memory ownership from the r...
Definition: hash_tmpbin.cpp:55
An in-memory single-threaded data structure to compose tuples in a hash bin.
Definition: hash_tmpbin.hpp:54
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
ErrorCode assure_capacity(uint64_t required_size, double expand_margin=2.0, bool retain_content=false) noexcept
If the current size is smaller than the given size, automatically expands.
void clean_quick()
This version selectively clears buckets_ by seeing individual records.
Definition: hash_tmpbin.cpp:81
void alloc(uint64_t size, uint64_t alignment, AllocType alloc_type, int numa_node) noexcept
Allocate a memory, releasing the current memory if exists.
#define LIKELY(x)
Hints that x is highly likely true.
Definition: compiler.hpp:103
HashTmpBin()
Constructor doesn't do any initialization.
Definition: hash_tmpbin.cpp:32
void overwrite_payload(const void *payload, uint16_t payload_offset, uint16_t payload_count) __attribute__((always_inline))
0 means no-error.
Definition: error_code.hpp:87
HashValue hashinate(const void *key, uint16_t key_length)
Calculates hash value for general input.
std::ostream & operator<<(std::ostream &o, const HashCombo &v)
Definition: hash_combo.cpp:37
RecordIndex get_records_consumed() const
char * get_payload() __attribute__((always_inline))
Definition: hash_tmpbin.hpp:92
This is a per-bin data structure.
Definition: hash_tmpbin.hpp:61
char * get_key() __attribute__((always_inline))
Definition: hash_tmpbin.hpp:91
ErrorCode delete_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash)
Logically deletes a record of the given key.
bool is_deleted() const __attribute__((always_inline))
Definition: xct_id.hpp:1040
Equivalent to std::hex in case the stream doesn't support it.
void * get_block() const
Returns the memory block.
Record * get_record(RecordIndex index) const
void set_payload(const void *payload, uint16_t payload_length) __attribute__((always_inline))
uint64_t get_size() const
Returns the byte size of the memory block.
ErrorCode insert_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_length)
Inserts a new record of the given key and payload.
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
ErrorCode overwrite_record(xct::XctId xct_id, const void *key, uint16_t key_length, HashValue hash, const void *payload, uint16_t payload_offset, uint16_t payload_count)
Overwrites a part of the record of the given key.
void give_memory(memory::AlignedMemory *recipient)
This is a special version of release_memory() where this object moves the memory ownership to the rec...
Definition: hash_tmpbin.cpp:64
Represents one memory block aligned to actual OS/hardware pages.
RecordIndex next_
constitutes a singly-linked list in each bucket
Definition: hash_tmpbin.hpp:86
Represents a record with a unique key.
Definition: hash_tmpbin.hpp:78
Convenient way of writing hex integers to stream.
void release_memory()
Destructor automatically releases everything, but you can use this to do it earlier.
Definition: hash_tmpbin.cpp:49
RecordIndex get_first_record() const
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
int compare_epoch_and_orginal(const XctId &other) const __attribute__((always_inline))
Returns -1, 0, 1 when this is less than, same, larger than other in terms of epoch/ordinal.
Definition: xct_id.hpp:993
0x080B : "STORAGE: This key already exists in this storage" .
Definition: error_code.hpp:177
uint64_t HashValue
Represents a full 64-bit hash value calculated from a key.
Definition: hash_id.hpp:129
bool is_null() const
Returns if this object doesn't hold a valid memory block.