20 #include <glog/logging.h>
42 : engine_(engine), context_(context), thread_id_(thread_id) {
46 default_rll_for_this_xct_ =
false;
47 enable_rll_for_this_xct_ = default_rll_for_this_xct_;
49 hot_threshold_for_this_xct_ = default_hot_threshold_for_this_xct_;
51 rll_threshold_for_this_xct_ = default_rll_threshold_for_this_xct_;
53 sysxct_workspace_ =
nullptr;
57 max_read_set_size_ = 0;
60 max_write_set_size_ = 0;
61 lock_free_read_set_ =
nullptr;
62 lock_free_read_set_size_ = 0;
63 max_lock_free_read_set_size_ = 0;
64 lock_free_write_set_ =
nullptr;
65 lock_free_write_set_size_ = 0;
66 max_lock_free_write_set_size_ = 0;
67 pointer_set_size_ = 0;
68 page_version_set_size_ = 0;
70 mcs_block_current_ =
nullptr;
71 mcs_rw_async_mapping_current_ =
nullptr;
72 local_work_memory_ =
nullptr;
73 local_work_memory_size_ = 0;
74 local_work_memory_cur_ = 0;
79 uint32_t* mcs_block_current,
80 uint32_t* mcs_rw_async_mapping_current) {
89 enable_rll_for_this_xct_ = default_rll_for_this_xct_;
91 hot_threshold_for_this_xct_ = default_hot_threshold_for_this_xct_;
93 rll_threshold_for_this_xct_ = default_rll_threshold_for_this_xct_;
95 sysxct_workspace_ =
reinterpret_cast<SysxctWorkspace*
>(pieces.sysxct_workspace_memory_);
97 read_set_ =
reinterpret_cast<ReadXctAccess*
>(pieces.xct_read_access_memory_);
100 write_set_ =
reinterpret_cast<WriteXctAccess*
>(pieces.xct_write_access_memory_);
104 pieces.xct_lock_free_read_access_memory_);
105 lock_free_read_set_size_ = 0;
108 pieces.xct_lock_free_write_access_memory_);
109 lock_free_write_set_size_ = 0;
111 pointer_set_ =
reinterpret_cast<PointerAccess*
>(pieces.xct_pointer_access_memory_);
112 pointer_set_size_ = 0;
113 page_version_set_ =
reinterpret_cast<PageVersionAccess*
>(pieces.xct_page_version_memory_);
114 page_version_set_size_ = 0;
115 mcs_block_current_ = mcs_block_current;
116 *mcs_block_current_ = 0;
117 mcs_rw_async_mapping_current_ = mcs_rw_async_mapping_current;
118 *mcs_rw_async_mapping_current_ = 0;
121 local_work_memory_cur_ = 0;
123 sysxct_workspace_->
init(context_);
124 current_lock_list_.
init(
128 retrospective_lock_list_.
init(
152 LOG(WARNING) <<
"Reached the maximum ordinal in this epoch. Advancing current epoch"
153 <<
" just for this reason. It's rare, but not an error.";
170 <<
"<active_>" << v.
is_active() <<
"</active_>";
172 <<
"</enable_rll_for_this_xct_>";
174 <<
"</default_rll_for_this_xct_>";
177 <<
"</default_hot_threshold>";
180 <<
"</default_rll_threshold>";
182 o <<
"<id_>" << v.
get_id() <<
"</id_>"
188 <<
"</lock_free_read_set_size>"
190 <<
"</lock_free_write_set_size>";
192 o << *sysxct_workspace;
208 for (uint32_t i = 0; i < pointer_set_size_; ++i) {
209 if (pointer_set_[i].address_ == pointer_address) {
220 pointer_set_[pointer_set_size_].
address_ = pointer_address;
221 pointer_set_[pointer_set_size_].
observed_ = observed;
234 for (uint32_t i = 0; i < pointer_set_size_; ++i) {
235 if (pointer_set_[i].address_ == pointer_address) {
252 page_version_set_[page_version_set_size_].
address_ = version_address;
253 page_version_set_[page_version_set_size_].
observed_ = observed;
254 ++page_version_set_size_;
259 bool intended_for_write,
263 bool no_readset_if_moved,
264 bool no_readset_if_next_layer) {
268 *read_set_address =
nullptr;
272 if (page_header.snapshot_) {
275 *observed_xid = tid_address->
xct_id_;
306 reinterpret_cast<uintptr_t
>(tid_address));
316 if (observed_xid->
is_moved() && no_readset_if_moved) {
318 }
else if (observed_xid->
is_next_layer() && no_readset_if_next_layer) {
335 bool intended_for_write,
346 bool lets_take_lock =
false;
347 if (!retrospective_lock_list_.
is_empty()) {
354 DVLOG(1) <<
"RLL recommends to take lock on this record!";
355 lets_take_lock =
true;
359 if (!lets_take_lock && context_->
is_hot_page(page_address)) {
360 lets_take_lock =
true;
363 if (lets_take_lock) {
385 DVLOG(0) <<
"Failed to take some of the lock that might be beneficial later"
386 <<
". We still go on because the locks here are not mandatory.";
397 XctId observed_owner_id,
412 XctId observed_owner_id,
426 if (
UNLIKELY(read_set_size_ >= max_read_set_size_)) {
433 *read_set_address = entry;
435 entry->storage_id_ = storage_id;
436 entry->set_owner_id_and_lock_id(owner_id_address, owner_lock_id);
437 entry->observed_owner_id_ = observed_owner_id;
438 entry->related_write_ =
nullptr;
447 char* payload_address,
459 if (
UNLIKELY(write_set_size_ >= max_write_set_size_)) {
476 XctId observed_owner_id,
478 char* payload_address,
484 auto* write = write_set_ + write_set_size_;
487 auto* read = read_set_ + read_set_size_;
492 write->owner_lock_id_,
495 ASSERT_ND(read->owner_id_address_ == owner_id_address);
496 read->related_write_ = write;
497 write->related_read_ = read;
498 ASSERT_ND(read->related_write_->related_read_ == read);
499 ASSERT_ND(write->related_read_->related_write_ == write);
500 ASSERT_ND(write->log_entry_ == log_entry);
501 ASSERT_ND(write->owner_id_address_ == owner_id_address);
509 char* payload_address,
517 auto* write = write_set_ + write_set_size_;
525 ASSERT_ND(write->related_read_->related_write_ == write);
526 ASSERT_ND(write->log_entry_ == log_entry);
527 ASSERT_ND(write->owner_id_address_ == owner_id_address);
534 XctId observed_owner_id,
540 if (
UNLIKELY(lock_free_read_set_size_ >= max_lock_free_read_set_size_)) {
544 lock_free_read_set_[lock_free_read_set_size_].
storage_id_ = storage_id;
546 lock_free_read_set_[lock_free_read_set_size_].
owner_id_address_ = owner_id_address;
547 ++lock_free_read_set_size_;
556 if (
UNLIKELY(lock_free_write_set_size_ >= max_lock_free_write_set_size_)) {
564 lock_free_write_set_[lock_free_write_set_size_].
storage_id_ = storage_id;
565 lock_free_write_set_[lock_free_write_set_size_].
log_entry_ = log_entry;
566 ++lock_free_write_set_size_;
ReadXctAccess * related_read_
void issue_next_id(XctId max_xct_id, Epoch *epoch)
Called while a successful commit of xct to issue a new xct id.
log::RecordLogType * log_entry_
Pointer to the log entry in private log buffer for this write opereation.
const memory::GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert page ID to page pointer.
RwLockableXctId * owner_id_address_
Pointer to the TID we protect against.
taken_mode_: we took a read-lock, not write-lock yet.
void set_epoch(Epoch epoch) __attribute__((always_inline))
UniversalLockId universal_lock_id_
Used to order locks in canonical order.
std::ostream & operator<<(std::ostream &o, const LockEntry &v)
Debugging.
Represents a record of special read-access during a transaction without any need for locking...
uint16_t get_default_rll_threshold_for_this_xct() const
LockListPosition binary_search(UniversalLockId lock) const
Analogous to std::binary_search() for the given lock.
ErrorCode add_to_read_set(storage::StorageId storage_id, XctId observed_owner_id, RwLockableXctId *owner_id_address, ReadXctAccess **read_set_address)
Add the given record to the read set of this transaction.
const storage::PageVersion * address_
Address to the page version.
Epoch get_current_global_epoch_weak() const
uint32_t max_lock_free_read_set_size_
The maximum number of lock-free read-set one transaction can have.
#define CXX11_NULLPTR
Used in public headers in place of "nullptr" of C++11.
Page * to_page(const void *address)
super-dirty way to obtain Page the address belongs to.
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
Methods related to Current Lock List (CLL) These are the only interface in Thread to lock records...
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
0x0A01 : "XCTION : Too large read-set. Check the config of XctOptions" .
uint32_t StorageId
Unique ID for storage.
bool enable_retrospective_lock_list_
Whether to use Retrospective Lock List (RLL) after aborts.
uint32_t get_write_set_size() const
uint32_t ordinal_
Indicates the ordinal among ReadXctAccess/WriteXctAccess of this transaction.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Represents a record of write-access during a transaction.
Epoch get_initial_current_epoch() const
const storage::VolatilePagePointer * address_
Address of the volatile pointer.
Represents one thread running on one NUMA core.
ErrorCode on_record_read(bool intended_for_write, RwLockableXctId *tid_address, XctId *observed_xid, ReadXctAccess **read_set_address, bool no_readset_if_moved=false, bool no_readset_if_next_layer=false)
The general logic invoked for every record read.
storage::VolatilePagePointer observed_
Value of the volatile pointer as of the access.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
const XctId & get_id() const
Returns the ID of this transaction, but note that it is not issued until commit time! ...
Represents a pointer to a volatile page with modification count for preventing ABA.
Represents a user transaction.
Just a synonym of XctId to be used as a page lock mechanism.
storage::PageVersionStatus observed_
Value of the page version as of the access.
Persistent status part of Transaction ID.
void invoke_assert_valid(void *log_buffer)
Invokes the assertion logic of each log type.
const LockListPosition kLockListPositionInvalid
Represents a record of read-access during a transaction.
bool is_active() const
Returns whether the object is an active transaction.
XctId xct_id_
the second 64bit: Persistent status part of TID.
An entry in CLL and RLL, representing a lock that is taken or will be taken.
uint32_t get_ordinal() const __attribute__((always_inline))
uint64_t get_current_lock_list_capacity() const
Snapshot isolation (SI), meaning the transaction reads a consistent and complete image of the databas...
ErrorCode add_to_write_set(storage::StorageId storage_id, RwLockableXctId *owner_id_address, char *payload_address, log::RecordLogType *log_entry)
Add the given record to the write set of this transaction.
uint64_t get_local_work_memory_size() const
bool is_valid() const __attribute__((always_inline))
0x0A06 : "XCTION : Too large page-version set. Consider using snapshot isolation." ...
ErrorCode add_to_read_and_write_set(storage::StorageId storage_id, XctId observed_owner_id, RwLockableXctId *owner_id_address, char *payload_address, log::RecordLogType *log_entry)
Add a pair of read and write set of this transaction.
ErrorCode add_to_lock_free_read_set(storage::StorageId storage_id, XctId observed_owner_id, RwLockableXctId *owner_id_address)
Add the given record to the special read-set that is not placed in usual data pages.
void * get_local_work_memory() const
Represents a record of special write-access during a transaction without any need for locking...
xct::LockEntry * get_retrospective_lock_list_memory() const
XctId observed_owner_id_
XID value we observed.
const EngineOptions & get_options() const
Repository of memories dynamically acquired within one CPU core (thread).
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
The MCS reader-writer lock variant of LockableXctId.
0x0AA1 : "XCTION : Lock acquire failed." .
uint64_t hot_threshold_
Page hotness >= this value will be considered hot (hybrid CC only).
Set of options for xct manager.
UniversalLockId to_universal_lock_id(storage::VolatilePagePointer page_id, uintptr_t addr)
bool is_default_rll_for_this_xct() const
const SmallThreadLocalMemoryPieces & get_small_thread_local_memory_pieces() const
void remember_previous_xct_id(XctId new_id)
UniversalLockId xct_id_to_universal_lock_id(const memory::GlobalVolatilePageResolver &resolver, RwLockableXctId *lock)
storage::StorageId storage_id_
The storage we accessed.
void store_max(const XctId &other) __attribute__((always_inline))
Kind of std::max(this, other).
memory::PagePoolOffset get_offset() const
void set_owner_id_resolve_lock_id(const memory::GlobalVolatilePageResolver &resolver, RwLockableXctId *owner_id_address)
Calculate owner_lock_id using the resolver.
uint32_t get_read_set_size() const
ErrorCode add_related_write_set(ReadXctAccess *related_read_set, RwLockableXctId *tid_address, char *payload_address, log::RecordLogType *log_entry)
Registers a write-set related to an existing read-set.
ErrorCode add_to_lock_free_write_set(storage::StorageId storage_id, log::RecordLogType *log_entry)
Add the given log to the lock-free write set of this transaction.
storage::StorageOptions storage_
uint16_t get_default_hot_threshold_for_this_xct() const
RwLockableXctId * owner_id_address_
Pointer to the accessed record.
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
uint32_t max_lock_free_write_set_size_
The maximum number of lock-free write-set one transaction can have.
uint16_t get_hot_threshold_for_this_xct() const
void assert_within_valid_volatile_page(const memory::GlobalVolatilePageResolver &resolver, const void *address)
Database engine object that holds all resources and provides APIs.
char * payload_address_
Pointer to the payload of the record.
taken_mode_: we took a write-lock.
0x0A02 : "XCTION : Too large write-set. Check the config of XctOptions" .
uint32_t get_page_version_set_size() const
Epoch get_epoch() const __attribute__((always_inline))
bool is_next_layer() const __attribute__((always_inline))
LockEntry * get_entry(LockListPosition pos)
Just a marker to denote that the memory region represents a data page.
xct::LockEntry * get_current_lock_list_memory() const
const uint64_t kMaxXctOrdinal
Maximum value of in-epoch ordinal.
void init(LockEntry *array, uint32_t capacity, const memory::GlobalVolatilePageResolver &resolver)
void cll_giveup_all_locks_after(xct::UniversalLockId address)
This gives-up locks in CLL that are not yet taken.
uint16_t hot_threshold_for_retrospective_lock_list_
When we construct Retrospective Lock List (RLL) after aborts, we add read-locks on records whose hotn...
storage::StorageId storage_id_
The storage we accessed.
SysxctWorkspace * get_sysxct_workspace() const
void on_record_read_take_locks_if_needed(bool intended_for_write, const storage::Page *page_address, UniversalLockId lock_id, RwLockableXctId *tid_address)
subroutine of on_record_read() to take lock(s).
WriteXctAccess * related_write_
An optional member that points to a write access related to this read.
bool is_hot_page(const storage::Page *page) const
Packs pointers to pieces of small_thread_local_memory_.
uint32_t max_write_set_size_
The maximum number of write-set one transaction can have.
void set_ordinal(uint32_t ordinal) __attribute__((always_inline))
uint32_t get_pointer_set_size() const
xct::XctManager * get_xct_manager() const
See Transaction Manager.
const LockEntry * get_array() const
uint32_t get_lock_free_read_set_size() const
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
VolatilePagePointer construct_volatile_page_pointer(uint64_t word)
void advance_current_global_epoch()
Requests to advance the current global epoch as soon as possible and blocks until it actually does...
LockMode
Represents a mode of lock.
uint16_t get_rll_threshold_for_this_xct() const
ErrorCode add_to_page_version_set(const storage::PageVersion *version_address, storage::PageVersionStatus observed)
Add the given page version to the page version set of this transaction.
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
bool is_being_written() const __attribute__((always_inline))
XctId spin_while_being_written() const __attribute__((always_inline))
Returns a version of this Xid whose being_written flag is off.
void init(thread::Thread *enclosing_thread)
void init(LockEntry *array, uint32_t capacity, const memory::GlobalVolatilePageResolver &resolver)
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
No guarantee at all for reads, for the sake of best performance and scalability.
PageHeader & get_header()
At least the basic header exists in all pages.
log::RecordLogType * log_entry_
Pointer to the log entry in private log buffer for this write opereation.
bool is_enable_rll_for_this_xct() const
Base class for log type of record-wise operation.
uint64_t get_retrospective_lock_list_capacity() const
#define UNLIKELY(x)
Hints that x is highly likely false.
bool is_moved() const __attribute__((always_inline))
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint8_t get_numa_node() const
Represents a record of following a page pointer during a transaction.
0x0A07 : "XCTION : Too large pointer-set. Consider using snapshot isolation." .
uint32_t get_lock_free_write_set_size() const
bool before(const Epoch &other) const
Returns if this epoch is before the given epoch in the sense of distance defined in RFC 1982...
uint32_t max_read_set_size_
The maximum number of read-set one transaction can have.
Represents a record of reading a page during a transaction.
void overwrite_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
The transaction that has updated the volatile pointer should not abort itself.
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
ErrorCode
Enum of error codes defined in error_code.xmacro.
Per-thread reused work memory for system transactions.
Xct(Engine *engine, thread::Thread *context, thread::ThreadId thread_id)
const UniversalLockId kNullUniversalLockId
This never points to a valid lock, and also evaluates less than any vaild alocks. ...
Protects against all anomalies in all situations.
ErrorCode cll_try_or_acquire_multiple_locks(xct::LockListPosition upto_pos)
Acquire multiple locks up to the given position in canonical order.
const memory::GlobalVolatilePageResolver & get_volatile_page_resolver() const
LockListPosition get_or_add_entry(UniversalLockId lock_id, RwLockableXctId *lock, LockMode preferred_mode)
Adds an entry to this list, re-sorting part of the list if necessary to keep the sortedness.
void initialize(memory::NumaCoreMemory *core_memory, uint32_t *mcs_block_current, uint32_t *mcs_rw_async_mapping_current)
storage::StorageId storage_id_
The storage we accessed.