20 #include <glog/logging.h>
60 LOG(INFO) <<
"Uninitializing an hash-storage " <<
get_name();
62 if (!
control_block_->root_page_pointer_.volatile_pointer_.is_null()) {
78 LOG(ERROR) <<
"This hash-storage already exists: " <<
get_name();
85 uint64_t required_partitioner_bytes = metadata.
get_bin_count() + 4096ULL;
86 uint64_t partitioner_bytes
90 if (partitioner_bytes < required_partitioner_bytes * 1.25) {
91 std::stringstream str;
92 str << metadata <<
".\n"
93 <<
"To accomodate this number of hash bins, partitioner_data_memory_mb_ must be"
94 <<
" at least " << (required_partitioner_bytes * 1.25 / (1ULL << 20));
99 LOG(INFO) <<
"Newly creating an hash-storage " <<
get_name();
109 const uint16_t kTheNode = 0;
121 control_block_->root_page_pointer_.volatile_pointer_.set(kTheNode, root_offset);
130 LOG(INFO) <<
"Newly created an hash-storage " <<
get_name();
156 reinterpret_cast<Page**>(&volatile_root)));
157 control_block_->root_page_pointer_.volatile_pointer_ = volatile_pointer;
161 LOG(INFO) <<
"Loaded a hash-storage " <<
get_name();
172 uint16_t* payload_capacity,
197 if (payload_length > *payload_capacity) {
199 DVLOG(0) <<
"buffer too small??" << payload_length <<
":" << *payload_capacity;
200 *payload_capacity = payload_length;
204 *payload_capacity = payload_length;
206 std::memcpy(payload, location.
record_ + key_offset, payload_length);
216 uint16_t payload_offset,
217 uint16_t payload_count,
240 if (payload_length < payload_offset + payload_count) {
241 LOG(WARNING) <<
"short record " << combo;
246 std::memcpy(payload, location.
record_ + key_offset + payload_offset, payload_count);
251 ASSERT_ND(physical_payload_hint >= payload_count);
252 if (physical_payload_hint < payload_count) {
253 physical_payload_hint = payload_count;
256 return physical_payload_hint;
267 char* record = location.
record_;
283 uint16_t payload_count,
284 uint16_t physical_payload_hint) {
297 physical_payload_hint,
320 DVLOG(2) <<
"Record expansion triggered. payload_count=" << payload_count
322 <<
", size hint=" << physical_payload_hint;
331 physical_payload_hint,
334 DVLOG(2) <<
"Expanded record!";
396 uint16_t payload_count,
397 uint16_t physical_payload_hint) {
415 physical_payload_hint,
432 DVLOG(2) <<
"Record expansion triggered. payload_count=" << payload_count
434 <<
", size hint=" << physical_payload_hint;
443 physical_payload_hint,
446 DVLOG(2) <<
"Expanded record!";
465 log_common = log_entry;
481 log_common = log_entry;
495 log_common = log_entry;
508 uint16_t payload_offset,
509 uint16_t payload_count) {
530 LOG(WARNING) <<
"short record " << combo;
553 template <
typename PAYLOAD>
560 uint16_t payload_offset) {
581 LOG(WARNING) <<
"short record " << combo;
586 PAYLOAD* current =
reinterpret_cast<PAYLOAD*
>(
617 reinterpret_cast<Page**>(root),
629 uint16_t index_in_parent,
635 uint8_t parent_level = parent->
get_level();
636 ASSERT_ND(!is_parent_snapshot || !for_write);
639 bool child_intermediate = (parent_level > 0);
640 if (is_parent_snapshot) {
648 ASSERT_ND((*page)->get_header().snapshot_);
650 }
else if (child_intermediate) {
658 reinterpret_cast<Page*>(parent),
667 if (child_intermediate) {
669 ASSERT_ND((*page)->get_header().get_in_layer_level() + 1U == parent_level);
672 ASSERT_ND(reinterpret_cast<HashDataPage*>(*page)->get_bin()
683 uint16_t index_in_parent,
702 if (snapshot_pointer == 0) {
710 null_pointer.
clear();
715 if (snapshot_pointer == 0) {
724 reinterpret_cast<Page*>(parent),
735 const auto offset = head_page_id.
get_offset();
749 std::memcpy(head_page, snapshot_head,
kPageSize);
765 DVLOG(1) <<
"Following next-link in hash data pages. Hopefully it's not that long..";
787 std::memcpy(next_page, snapshot_page,
kPageSize);
792 cur_page = next_page;
797 bool must_release_pages =
false;
799 uint64_t expected = 0;
800 if (assorted::raw_atomic_compare_exchange_strong<uint64_t>(
803 head_page_id.
word)) {
805 *page =
reinterpret_cast<Page*
>(head_page);
810 LOG(INFO) <<
"Interesting. Someone else has installed a volatile version.";
812 must_release_pages =
true;
815 must_release_pages =
true;
818 if (must_release_pages) {
827 if (next_id.is_null()) {
856 uint8_t parent_level = parent->
get_level();
867 volatile_null.
clear();
875 if (parent_level == 0) {
884 ASSERT_ND(*bin_head !=
nullptr || !for_write);
940 bool create_if_notfound,
941 uint16_t create_payload_length,
949 ASSERT_ND(for_write || !create_if_notfound);
985 LOG(INFO) <<
"Interesting. The record has been just moved";
1006 if (
UNLIKELY(record_count != record_count_again)) {
1007 LOG(INFO) <<
"Interesting. concurrent insertion just happend to the page";
1014 LOG(INFO) <<
"Interesting. concurrent next-page installation just happend to the page";
1021 if (create_if_notfound) {
1031 create_payload_length,
1038 if (physical_only) {
1045 LOG(INFO) <<
"Interesting. The record has been just moved after creation!";
1055 if (physical_only) {
1077 uint16_t key_length,
1079 uint16_t payload_length,
1081 uint16_t examined_records,
1111 ASSERT_ND(&old_slot->tid_ == old_address);
1114 const char* key = page->record_from_offset(old_slot->offset_);
1115 uint16_t key_length = old_slot->key_length_;
1130 ASSERT_ND(page->get_slot_address(old_index) == old_slot);
1138 uint16_t key_length,
1169 if (
UNLIKELY(record_count != record_count_again)) {
1170 LOG(INFO) <<
"Interesting. concurrent insertion just happend to the page";
1176 LOG(WARNING) <<
"no next page?? but we didn't find the moved record in this page";
1179 LOG(ERROR) <<
"Unexpected error, failed to track moved record in hash storage."
1180 <<
" This should not happen. hash combo=" << combo;
1192 #define EXPIN_5I(x) template ErrorCode HashStoragePimpl::increment_record< x > \
1193 (thread::Thread* context, \
1195 uint16_t key_length, \
1196 const HashCombo& combo, \
1198 uint16_t payload_offset)
0x080D : "STORAGE: HASH: Number of hash-bins too large compared to storage.partitioner_data_memory_mb...
log::RecordLogType * log_entry_
Pointer to the log entry in private log buffer for this write opereation.
0x080A : "STORAGE: The record's payload is smaller than requested" .
ErrorCode increment_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, PAYLOAD *value, uint16_t payload_offset)
Metadata meta_
common part of the metadata.
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
uint16_t adjust_payload_hint(uint16_t payload_count, uint16_t physical_payload_hint)
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
T align8(T value)
8-alignment.
void release_free_volatile_page(PagePoolOffset offset)
Returns one free volatile page to local page pool.
Represents a pointer to another page (usually a child page).
const DataPageBloomFilter & bloom_filter() const __attribute__((always_inline))
memory::NumaCoreMemory * get_thread_memory() const
Returns the private memory repository of this thread.
0x080C : "STORAGE: This key is not found in this storage" .
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash, const void *payload, uint16_t payload_offset, uint16_t payload_count) __attribute__((always_inline))
Automatically calls if uninitialize() wasn't called when it gets out of scope, and just complains whe...
Page * to_page(const void *address)
super-dirty way to obtain Page the address belongs to.
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
const HashBinRange & get_bin_range() const
xct::RwLockableXctId tid_
TID of the record.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Represents a record of write-access during a transaction.
ErrorCode grab_one(PagePoolOffset *offset)
Grab only one page.
ErrorCode overwrite_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, const void *payload, uint16_t payload_offset, uint16_t payload_count)
Represents one thread running on one NUMA core.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
DataPageSlotIndex out_slot_
[Out] The slot of the record that is found or created.
uint16_t get_aligned_key_length() const
ErrorCode get_record_part(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, void *payload, uint16_t payload_offset, uint16_t payload_count, bool read_only)
uint16_t get_max_payload() const
ErrorCode locate_record_logical(thread::Thread *context, bool for_write, bool create_if_notfound, uint16_t create_payload_length, const void *key, uint16_t key_length, const HashCombo &combo, HashDataPage *bin_head, RecordLocation *result)
locate_record()'s logical+physical version.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
Result of track_moved_record().
const DataPageSlotIndex kSlotNotFound
Represents a pointer to a volatile page with modification count for preventing ABA.
Represents a user transaction.
ErrorCode follow_page_bin_head(thread::Thread *context, bool for_write, HashIntermediatePage *parent, uint16_t index_in_parent, Page **page)
subroutine to follow a pointer to head of bin from a volatile parent
ErrorCode get_root_page(thread::Thread *context, bool for_write, HashIntermediatePage **root)
Retrieves the root page of this storage.
bool contains(const BloomFilterFingerprint &fingerprint) const __attribute__((always_inline))
ErrorCode follow_page(thread::Thread *context, bool for_write, HashIntermediatePage *parent, uint16_t index_in_parent, Page **page)
for non-root
ErrorStack load_one_volatile_page(cache::SnapshotFileSet *fileset, storage::SnapshotPagePointer snapshot_pointer, storage::VolatilePagePointer *pointer, storage::Page **page)
Another convenience method that also reads an existing snapshot page to the volatile page...
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
bool compare_slot_key(DataPageSlotIndex index, HashValue hash, const void *key, uint16_t key_length) const
returns whether the slot contains the exact key specified
Snapshot isolation (SI), meaning the transaction reads a consistent and complete image of the databas...
ErrorCode populate_logical(xct::Xct *cur_xct, HashDataPage *page, DataPageSlotIndex index, bool intended_for_write)
Populates the result with XID and possibly readset.
ErrorCode insert_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, const void *payload, uint16_t payload_count, uint16_t physical_payload_hint)
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
The storage has been created and ready for use.
ErrorCode add_to_write_set(storage::StorageId storage_id, RwLockableXctId *owner_id_address, char *payload_address, log::RecordLogType *log_entry)
Add the given record to the write set of this transaction.
Definitions of IDs in this package and a few related constant values.
Holds a set of read-only file objects for snapshot files.
ErrorCode locate_record_reserve_physical(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, uint16_t payload_length, HashDataPage **page_in_out, uint16_t examined_records, DataPageSlotIndex *new_location)
Subroutine of locate_record() to create/migrate a physical record of the given key in the page or its...
const StorageName & get_name() const
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
uint8_t bins_to_level(uint64_t bins)
HashDataPage * out_page_
[Out] The page that contains the found/created record.
storage::VolatilePagePointer grab_free_volatile_page_pointer()
Wrapper for grab_free_volatile_page().
HashDataPage * page_
The data page (might not be bin-head) containing the record.
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash, const void *payload, uint16_t payload_count) __attribute__((always_inline))
HashBin get_bin_count() const
const EngineOptions & get_options() const
Repository of memories dynamically acquired within one CPU core (thread).
const DualPagePointer * next_page_address() const __attribute__((always_inline))
ErrorStack create(const HashMetadata &metadata)
The MCS reader-writer lock variant of LockableXctId.
const Slot * get_slot_address(DataPageSlotIndex record) const __attribute__((always_inline))
same as &get_slot(), but this is more explicit and easier to understand/maintain
storage::Page * resolve(storage::VolatilePagePointer ptr) const
Shorthand for get_global_volatile_page_resolver.resolve_offset()
uint8_t get_levels() const
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
uint32_t partitioner_data_memory_mb_
Size in MB of a shared memory buffer allocated for all partitioners during log gleaning.
VolatilePagePointer volatile_pointer_
char * record_
Address of the record.
Independent utility methods/classes for hashination, or hash functions.
uint64_t fanout_power(uint8_t exponent)
bool is_moved() const __attribute__((always_inline))
memory::PagePoolOffset get_offset() const
HashStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
ErrorCode add_related_write_set(ReadXctAccess *related_read_set, RwLockableXctId *tid_address, char *payload_address, log::RecordLogType *log_entry)
Registers a write-set related to an existing read-set.
storage::StorageOptions storage_
Declares all log types used in this storage type.
uint16_t cur_payload_length_
Logical payload length as-of the observed XID.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Calls Initializable::uninitialize() automatically when it gets out of scope.
Constants and methods related to CPU cacheline and its prefetching.
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash, const void *payload, uint16_t payload_count) __attribute__((always_inline))
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
Log type of hash-storage's update operation.
0x0802 : "STORAGE: This storage already exists" .
A base class for HashInsertLogType/HashDeleteLogType/HashOverwriteLogType.
char * payload_address_
Pointer to the payload of the record.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
SnapshotPagePointer snapshot_pointer_
ErrorStack load(const StorageControlBlock &snapshot_block)
A system transaction to reserve a physical record(s) in a hash data page.
Fix-sized slot for each record, which is placed at the end of data region.
Just a marker to denote that the memory region represents a data page.
HashBin begin_
Inclusive beginning of the range.
ErrorCode locate_bin(thread::Thread *context, bool for_write, const HashCombo &combo, HashDataPage **bin_head)
Find a pointer to the bin that contains records for the hash.
uint16_t DataPageSlotIndex
bool is_deleted() const __attribute__((always_inline))
xct::TrackMovedRecordResult track_moved_record(xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
storage::StorageId storage_id_
The storage we accessed.
void release_pages_recursive_parallel(Engine *engine)
uint8_t get_bin_bits() const
P * resolve_cast(storage::VolatilePagePointer ptr) const
resolve() plus reinterpret_cast
A set of information that are used in many places, extracted from the given key.
DualPagePointer * get_pointer_address(uint16_t index)
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id, const HashIntermediatePage *parent, uint8_t level, HashBin start_bin)
Called only when this page is initialized.
const memory::LocalPageResolver & get_local_volatile_page_resolver() const
Returns page resolver to convert only local page ID to page pointer.
void assert_range() const __attribute__((always_inline))
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
void populate_physical(HashDataPage *page, DataPageSlotIndex index)
Populates fields other than readset_.
void populate(StorageId storage_id, const void *key, uint16_t key_length, uint8_t bin_bits, HashValue hash) __attribute__((always_inline))
ErrorCode locate_record(thread::Thread *context, bool for_write, bool physical_only, bool create_if_notfound, uint16_t create_payload_length, const void *key, uint16_t key_length, const HashCombo &combo, HashDataPage *bin_head, RecordLocation *result)
Usually follows locate_bin to locate the exact physical record for the key, or create a new one if no...
BloomFilterFingerprint fingerprint_
Represents an intermediate page in Hashtable Storage.
ErrorCode register_record_write_log(thread::Thread *context, const RecordLocation &location, log::RecordLogType *log_entry)
Used in the following methods.
void assert_record_and_log_keys(xct::RwLockableXctId *owner_id, const char *data) const
used only for sanity check.
IsolationLevel
Specifies the level of isolation during transaction processing.
void hash_intermediate_volatile_page_init(const VolatilePageInitArguments &args)
volatile page initialize callback for HashIntermediatePage.
char * record_from_offset(uint16_t offset)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Represents an individual data page in Hashtable Storage.
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
const DualPagePointer & next_page() const __attribute__((always_inline))
Log type of hash-storage's insert operation.
void hash_data_volatile_page_init(const VolatilePageInitArguments &args)
volatile page initialize callback for HashDataPage.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
Log type of hash-storage's overwrite operation.
xct::XctId observed_
TID as of locate_record() identifying the record.
PageVersionStatus status_
0x0809 : "STORAGE: The record's payload is larger than the buffer" .
VolatilePagePointer construct_volatile_page_pointer(uint64_t word)
const ErrorStack kRetOk
Normal return value for no-error case.
return value of locate_record().
ThreadGroupId get_numa_node() const
PagePool * get_volatile_pool()
ErrorCode add_to_page_version_set(const storage::PageVersion *version_address, storage::PageVersionStatus observed)
Add the given page version to the page version set of this transaction.
uint8_t get_level() const
void memory_fence_consume()
Equivalent to std::atomic_thread_fence(std::memory_order_consume).
xct::ReadXctAccess * readset_
If this method took a read-set on the returned record, points to the corresponding read-set...
#define ERROR_STACK_MSG(e, m)
Overload of ERROR_STACK(e) to receive a custom error message.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
PageHeader & get_header()
At least the basic header exists in all pages.
const uint8_t kHashIntermediatePageFanout
Number of pointers in an intermediate page of hash storage.
bool has_next_page() const __attribute__((always_inline))
#define INSTANTIATE_ALL_NUMERIC_TYPES(M)
INSTANTIATE_ALL_TYPES minus std::string.
static uint16_t calculate_log_length(uint16_t key_length, uint16_t payload_count) __attribute__((always_inline))
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Base class for log type of record-wise operation.
uint16_t get_record_count() const __attribute__((always_inline))
#define UNLIKELY(x)
Hints that x is highly likely false.
bool is_moved() const __attribute__((always_inline))
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint8_t get_numa_node() const
Definitions of IDs in this package and a few related constant values.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries=0)
Methods related to System transactions (sysxct) nested under this thread.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
Raw atomic operations that work for both C++11 and non-C++11 code.
A base layout of shared data for all storage types.
DualPagePointer & get_pointer(uint16_t index)
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
ErrorCode delete_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo)
ErrorCode
Enum of error codes defined in error_code.xmacro.
DataPageSlotIndex search_key_physical(HashValue hash, const BloomFilterFingerprint &fingerprint, const void *key, KeyLength key_length, DataPageSlotIndex record_count, DataPageSlotIndex check_from=0) const
Search for a physical slot that exactly contains the given key.
DataPageSlotIndex index_
Index of the record in the page.
ErrorCode upsert_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, const void *payload, uint16_t payload_count, uint16_t physical_payload_hint)
0x080B : "STORAGE: This key already exists in this storage" .
ErrorCode get_record(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, void *payload, uint16_t *payload_capacity, bool read_only)
Protects against all anomalies in all situations.
Log type of hash-storage's delete operation.
uint16_t offset_
Byte offset in data_ where this record starts.
ErrorCode locate_record_in_snapshot(thread::Thread *context, const void *key, uint16_t key_length, const HashCombo &combo, HashDataPage *bin_head, RecordLocation *result)
Simpler version of locate_record for when we are in snapshot world.
xct::TrackMovedRecordResult track_moved_record_search(HashDataPage *page, const void *key, uint16_t key_length, const HashCombo &combo)