22 #include <glog/logging.h>
64 global_ordinal_(global_ordinal),
65 core_memory_(nullptr),
66 node_memory_(nullptr),
67 snapshot_cache_hashtable_(nullptr),
68 snapshot_page_pool_(nullptr),
69 log_buffer_(engine, id),
70 current_xct_(engine, holder, id),
71 snapshot_file_set_(engine),
72 control_block_(nullptr),
73 task_input_memory_(nullptr),
74 task_output_memory_(nullptr),
75 mcs_ww_blocks_(nullptr),
76 mcs_rw_simple_blocks_(nullptr),
77 mcs_rw_extended_blocks_(nullptr),
78 canonical_address_(nullptr) {
129 LOG(INFO) <<
"Thread-" <<
id_ <<
" requested to terminate";
143 if (!chunk->
empty()) {
165 LOG(INFO) <<
"Thread-" <<
id_ <<
" started running on NUMA node: " << numa_node
174 uint64_t demand = control_block_->wakeup_cond_.acquire_ticket();
181 VLOG(0) <<
"Thread-" <<
id_ <<
" sleeping...";
182 control_block_->wakeup_cond_.timedwait(demand, 100000ULL, 1U << 16, 1U << 13);
185 VLOG(0) <<
"Thread-" <<
id_ <<
" woke up. status=" << control_block_->status_;
187 control_block_->output_len_ = 0;
200 VLOG(0) <<
"Thread-" <<
id_ <<
" retrieved a task: " << proc_name;
205 LOG(ERROR) <<
"Thread-" <<
id_ <<
" couldn't find procedure: " << proc_name;
207 uint32_t output_used = 0;
212 control_block_->input_len_,
218 VLOG(0) <<
"Thread-" <<
id_ <<
" run(task) returned. result =" << result
219 <<
", output_used=" << output_used;
220 control_block_->output_len_ = output_used;
223 control_block_->proc_result_.from_error_stack(result);
225 control_block_->proc_result_.clear();
230 control_block_->task_complete_cond_.signal();
232 VLOG(0) <<
"Thread-" <<
id_ <<
" finished a task. result =" << result;
237 LOG(INFO) <<
"Thread-" <<
id_ <<
" exits";
246 pthread_t handle =
static_cast<pthread_t
>(
raw_thread_.native_handle());
249 int ret = ::pthread_getschedparam(handle, &policy, ¶m);
256 LOG(INFO) <<
"The default thread policy=" << policy <<
", priority=" << param.__sched_priority;
265 int priority_max = ::sched_get_priority_max(policy);
266 int priority_min = ::sched_get_priority_min(policy);
268 LOG(WARNING) <<
"Thread priority too large. using max value: "
270 param.__sched_priority = priority_max;
273 LOG(WARNING) <<
"Thread priority too small. using min value: "
275 param.__sched_priority = priority_min;
277 int ret2 = ::pthread_setschedparam(handle, policy, ¶m);
281 LOG(WARNING) <<
"========= ATTENTION: Thread-scheduling Permission Error!!!! ==========\n"
282 " pthread_setschedparam() failed due to permission error. This means you have\n"
283 " not set appropriate rtprio to limits.conf. You cannot set priority higher than what\n"
284 " OS allows. Configure limits.conf (eg. 'kimurhid - rtprio 99') or modify ThreadOptions.\n"
285 "============================= ATTENTION ======================";
301 const auto offset = volatile_pointer.
get_offset();
326 assorted::raw_atomic_compare_exchange_strong<uint64_t>(
336 VLOG(0) <<
"Interesting. Lost race to install a volatile page. ver-b. Thread-" <<
id_
337 <<
", local offset=" << new_offset <<
" winning=" << cur_pointer;
344 LOG(WARNING) <<
"Very interesting. Lost race but volatile page not installed. Thread-"
345 <<
id_ <<
", local offset=" << new_offset;
355 bool tolerate_null_pointer,
357 bool take_ptr_set_snapshot,
361 uint16_t index_in_parent) {
364 tolerate_null_pointer,
366 take_ptr_set_snapshot,
376 bool tolerate_null_pointer,
377 bool take_ptr_set_snapshot,
380 const uint16_t* index_in_parents,
381 bool* followed_snapshots,
386 tolerate_null_pointer,
387 take_ptr_set_snapshot,
400 const uint16_t* index_in_parents,
413 bool tolerate_null_pointer,
415 bool take_ptr_set_snapshot,
419 uint16_t index_in_parent) {
420 ASSERT_ND(!tolerate_null_pointer || !will_modify);
423 bool followed_snapshot =
false;
425 if (volatile_pointer.
is_null()) {
427 if (tolerate_null_pointer) {
449 page_initializer(args);
461 if (!volatile_pointer.
is_null()) {
464 }
else if (will_modify) {
470 followed_snapshot =
true;
473 ASSERT_ND((*page) ==
nullptr || (followed_snapshot == (*page)->get_header().snapshot_));
477 if ((*page ==
nullptr || followed_snapshot) && take_ptr_set_snapshot) {
487 bool tolerate_null_pointer,
488 bool take_ptr_set_snapshot,
491 const uint16_t* index_in_parents,
492 bool* followed_snapshots,
494 ASSERT_ND(tolerate_null_pointer || page_initializer);
495 if (batch_size == 0) {
503 bool has_some_snapshot =
false;
504 const bool needs_ptr_set
511 std::memset(tmp_out, 0xDA,
sizeof(tmp_out));
516 for (uint16_t b = 0; b < batch_size; ++b) {
517 snapshot_page_ids[b] = 0;
519 if (pointer ==
nullptr) {
524 ASSERT_ND(parents[b]->get_header().snapshot_ == followed_snapshots[b]);
526 has_some_snapshot =
true;
532 if (has_some_snapshot) {
537 for (uint16_t b = 0; b < batch_size; ++b) {
539 if (has_some_snapshot) {
540 if (pointer ==
nullptr) {
543 }
else if (tmp_out[b]) {
545 if (needs_ptr_set && !followed_snapshots[b]) {
548 followed_snapshots[b] =
true;
556 followed_snapshots[b] =
false;
557 ASSERT_ND(!parents[b]->get_header().snapshot_);
561 if (tolerate_null_pointer) {
578 page_initializer(args);
600 const uint16_t* index_in_parents,
605 for (uint16_t b = 0; b < batch_size; ++b) {
607 if (pointer ==
nullptr) {
611 ASSERT_ND(!parents[b]->get_header().snapshot_);
614 if (!volatile_pointer.
is_null()) {
636 page_initializer(args);
658 DVLOG(0) <<
"Interesting, this race is rare, but possible. offset=" << offset;
675 reinterpret_cast<void**>(out),
692 if (batch_size == 0) {
702 for (uint16_t b = 0; b < batch_size; ++b) {
708 }
else if (b > 0 && page_ids[b - 1] == page_id) {
715 DVLOG(0) <<
"Interesting, this race is rare, but possible. offset=" << offset;
729 for (uint16_t b = 0; b < batch_size; ++b) {
732 reinterpret_cast<void**>(out + b),
749 LOG(ERROR) <<
"Could not grab free snapshot page while cache miss. thread=" << *
holder_
757 LOG(ERROR) <<
"Failed to read a snapshot page. thread=" << *
holder_
763 *pool_offset = offset;
793 if (chunk->
size() == 0) {
797 while (safe_count < chunk->size() / 10U) {
798 LOG(WARNING) <<
"Thread-" <<
id_ <<
" can return only "
799 << safe_count <<
" out of " << chunk->
size()
800 <<
" retired pages to node-" << node <<
" in epoch=" << current_epoch
801 <<
". This means the thread received so many retired pages in a short time period."
802 <<
" Will adavance an epoch to safely return the retired pages."
803 <<
" This should be a rare event.";
806 LOG(INFO) <<
"okay, advanced epoch. now we should be able to return more pages";
810 VLOG(0) <<
"Thread-" <<
id_ <<
" batch-returning retired volatile pages to node-" << node
811 <<
" safe_count/count=" << safe_count <<
"/" << chunk->
size() <<
". epoch=" << current_epoch;
814 volatile_pool->
release(safe_count, chunk);
861 ASSERT_ND(reinterpret_cast<uintptr_t>(address) % 4 == 0);
864 template<
typename RW_BLOCK>
879 auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(
this));
882 auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(
this));
890 auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(
this));
893 auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(
this));
901 auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(
this));
904 auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(
this));
912 auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(
this));
915 auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(
this));
922 auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(
this));
925 auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(
this));
982 uint32_t max_retries) {
993 enclosing_max_lock_id,
1002 enclosing_max_lock_id,
1012 auto& sysxct_lock_list = sysxct_workspace->
lock_list_;
1015 return sysxct_lock_list.request_record_lock(adaptor, page_id, lock);
1018 return sysxct_lock_list.request_record_lock(adaptor, page_id, lock);
1024 uint32_t lock_count,
1027 auto& sysxct_lock_list = sysxct_workspace->
lock_list_;
1030 return sysxct_lock_list.batch_request_record_locks(adaptor, page_id, lock_count, locks);
1033 return sysxct_lock_list.batch_request_record_locks(adaptor, page_id, lock_count, locks);
1040 auto& sysxct_lock_list = sysxct_workspace->
lock_list_;
1043 return sysxct_lock_list.request_page_lock(adaptor, page);
1046 return sysxct_lock_list.request_page_lock(adaptor, page);
1051 uint32_t lock_count,
1054 auto& sysxct_lock_list = sysxct_workspace->
lock_list_;
1057 return sysxct_lock_list.batch_request_page_locks(adaptor, lock_count, pages);
1060 return sysxct_lock_list.batch_request_page_locks(adaptor, lock_count, pages);
1066 "ThreadControlBlock is too large.");
void set_default_rll_for_this_xct(bool value)
Set of options about threads and thread-groups.
void initialize(ThreadId my_thread_id)
ThreadPoolPimpl * get_pimpl() const
Returns the pimpl of this object.
void collect_retired_volatile_page(storage::VolatilePagePointer ptr)
Keeps the specified volatile page as retired as of the current epoch.
void assert_mcs_aligned(const void *address)
ThreadPimpl MCS implementations.
ErrorCode sysxct_page_lock(xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
void * task_input_memory_
ErrorCode run_nested_sysxct_impl(SysxctFunctor *functor, MCS_ADAPTOR mcs_adaptor, uint32_t max_retries, SysxctWorkspace *workspace, UniversalLockId enclosing_max_lock_id, ENCLOSURE_RELEASE_ALL_LOCKS_FUNCTOR enclosure_release_all_locks_functor)
Runs a system transaction nested in a user transaction.
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
0x0002 : "GENERAL: Invalid parameter given" .
Implements McsAdaptorConcept over ThreadPimpl.
void release_free_volatile_page(PagePoolOffset offset)
Returns one free volatile page to local page pool.
Represents a pointer to another page (usually a child page).
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
void cll_release_all_locks()
NumaCoreMemory * get_core_memory(foedus::thread::ThreadId id) const
xct::CurrentLockList * get_current_lock_list()
Epoch get_current_global_epoch_weak() const
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
ErrorStack uninitialize_once() override final
ErrorCode follow_page_pointers_for_write_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
Idle state, receiving a new task.
void set_default_hot_threshold_for_this_xct(uint16_t value)
uint32_t mcs_block_current_
How many MCS blocks we allocated in this thread's current xct.
void set_thread_schedule()
initializes the thread's policy/priority
ErrorCode cll_try_or_acquire_multiple_locks(xct::LockListPosition upto_pos)
uint64_t stat_snapshot_cache_hits_
ErrorCode sysxct_batch_page_locks(xct::SysxctWorkspace *sysxct_workspace, uint32_t lock_count, storage::Page **pages)
Takes a bunch of page locks for a sysxct running under this thread.
bool is_simple_mcs_rw() const
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
Methods related to Current Lock List (CLL) These are the only interface in Thread to lock records...
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
SysxctLockList lock_list_
Lock list for the system transaction.
bool is_retired() const __attribute__((always_inline))
ErrorCode install(storage::SnapshotPagePointer page_id, ContentId content)
Called when a cached page is not found.
bool enable_retrospective_lock_list_
Whether to use Retrospective Lock List (RLL) after aborts.
ThreadRef get_thread_ref(ThreadId id) __attribute__((always_inline))
For better performance, but for some reason this method causes an issue in MCS lock.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
PagePool * get_snapshot_pool()
ErrorCode sysxct_batch_record_locks(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, uint32_t lock_count, xct::RwLockableXctId **locks)
Takes a bunch of locks in the same page for a sysxct running under this thread.
ErrorCode sysxct_record_lock(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
Takes a lock for a sysxct running under this thread.
void release_all_after(UniversalLockId address, MCS_RW_IMPL *mcs_rw_impl)
Release all locks in CLL whose addresses are canonically ordered before the parameter.
memory::NumaNodeMemory * node_memory_
same above
Represents one thread running on one NUMA core.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
xct::McsImpl< ThreadPimplMcsAdaptor< RW_BLOCK >, RW_BLOCK > get_mcs_impl(ThreadPimpl *pimpl)
std::thread raw_thread_
Encapsulates raw thread object.
xct::McsRwExtendedBlock * get_mcs_rw_extended_blocks()
Shared data of ThreadPimpl.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
A client has set a next task.
Represents a pointer to a volatile page with modification count for preventing ABA.
UniversalLockId get_max_locked_id() const
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
void cll_giveup_all_locks_after(xct::UniversalLockId address)
void giveup_all_after(UniversalLockId address, MCS_RW_IMPL *mcs_rw_impl)
This gives-up locks in CLL that are not yet taken.
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Reader-writer (RW) MCS lock classes.
Part of NodeMemoryAnchors for each thread.
Pin the current thread to the given NUMA node in this object's scope.
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
Max size for find_or_read_snapshot_pages_batch() etc.
storage::VolatilePagePointer grab_free_volatile_page_pointer()
Wrapper for grab_free_volatile_page().
bool snapshot_cache_enabled_
Whether to cache the read accesses on snapshot files.
ErrorCode sysxct_batch_page_locks(xct::SysxctWorkspace *sysxct_workspace, uint32_t lock_count, storage::Page **pages)
std::atomic< bool > raw_thread_set_
Just to make sure raw_thread_ is set.
const EngineOptions & get_options() const
bool overwrite_thread_schedule_
Whether to overwrite policy/priority of worker threads.
xct::McsWwBlock * mcs_ww_blocks_
Pre-allocated MCS blocks.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
soc::SharedPolling wakeup_cond_
The thread sleeps on this conditional when it has no task.
The MCS reader-writer lock variant of LockableXctId.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
uint64_t hot_threshold_
Page hotness >= this value will be considered hot (hybrid CC only).
bool is_volatile_page_retired(storage::VolatilePagePointer ptr)
Subroutine of collect_retired_volatile_page() just for assertion.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
storage::Page * get_base() const
VolatilePagePointer volatile_pointer_
void assert_valid_volatile_page(const Page *page, uint32_t offset)
ThreadPimpl *const pimpl_
A functor representing the logic in a system transaction via virtual-function.
void release_all_locks(MCS_RW_IMPL *mcs_rw_impl)
The thread is requested to terminate.
ErrorCode find_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, ContentId *out) const
Batched version of find().
void push_back(PagePoolOffset offset, const Epoch &safe_epoch)
ErrorCode try_or_acquire_single_lock(LockListPosition pos, MCS_RW_IMPL *mcs_rw_impl)
Methods below take or release locks, so they receive MCS_RW_IMPL, a template param.
memory::PagePoolOffset get_offset() const
ErrorCode find_or_read_snapshot_pages_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
Batched version of find_or_read_a_snapshot_page().
Max size for find_batch()
Definitions of IDs in this package and a few related constant values.
const ThreadGroupId numa_node_
Node this thread belongs to.
storage::StorageOptions storage_
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
ErrorCode sysxct_page_lock(xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
Takes a page lock in the same page for a sysxct running under this thread.
void flush_retired_volatile_page(uint16_t node, Epoch current_epoch, memory::PagePoolOffsetAndEpochChunk *chunk)
Subroutine of collect_retired_volatile_page() in case the chunk becomes full.
Engine *const engine_
MCS locks methods.
ErrorCode sysxct_record_lock(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
0x0901 : "SPCACHE: Not enough free snapshot pages. Cleaner is not catching up" .
Implements an MCS-locking Algorithm.
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
void cll_release_all_locks_after(xct::UniversalLockId address)
RW-locks.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
void cll_release_all_locks()
Pre-allocated MCS block for extended version of RW-locks.
Thread *const holder_
The public object that holds this pimpl object.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
void release_free_snapshot_page(PagePoolOffset offset)
Same, except it's for snapshot page.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
The thread has picked the task up and is now running.
ErrorCode follow_page_pointers_for_read_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
Batched version of follow_page_pointer with will_modify==false.
Database engine object that holds all resources and provides APIs.
ThreadPriority thread_priority_
Thread priority for worker threads.
uint32_t mcs_rw_async_mapping_current_
How many async mappings for extended RW lock we have so far.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
SnapshotPagePointer snapshot_pointer_
ErrorCode sysxct_batch_record_locks(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, uint32_t lock_count, xct::RwLockableXctId **locks)
Typedefs of ID types used in procedure package.
A view of Thread object for other SOCs and master engine.
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
ErrorCode try_or_acquire_multiple_locks(LockListPosition upto_pos, MCS_RW_IMPL *mcs_rw_impl)
Acquire multiple locks up to the given position in canonical order.
Set of arguments, both inputs and outputs, given to each volatile page initializer.
Just a marker to denote that the memory region represents a data page.
void cll_release_all_locks_after(xct::UniversalLockId address)
Release all locks in CLL of this thread whose addresses are canonically ordered before the parameter...
void cll_giveup_all_locks_after(xct::UniversalLockId address)
This gives-up locks in CLL that are not yet taken.
uint16_t hot_threshold_for_retrospective_lock_list_
When we construct Retrospective Lock List (RLL) after aborts, we add read-locks on records whose hotn...
void set(uint8_t numa_node, memory::PagePoolOffset offset)
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
SysxctWorkspace * get_sysxct_workspace() const
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries)
Sysxct-related.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
void handle_tasks()
Main routine of the worker thread.
proc::ProcManager * get_proc_manager() const
See System and User Procedures.
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
void spinlock_yield()
Invoke _mm_pause(), x86 PAUSE instruction, or something equivalent in the env.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
PagePoolOffsetAndEpochChunk * get_retired_volatile_pool_chunk(uint16_t node)
xct::McsRwSimpleBlock * mcs_rw_simple_blocks_
xct::XctManager * get_xct_manager() const
See Transaction Manager.
cache::CacheHashtable * get_snapshot_cache_table()
ErrorStack initialize_once() override final
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
PagePoolOffset grab_free_snapshot_page()
Same, except it's for snapshot page.
uint16_t ThreadGlobalOrdinal
Typedef for a globally and contiguously numbered ID of thread.
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
std::string os_error()
Thread-safe strerror(errno).
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
ErrorStack get_proc(const ProcName &name, Proc *out)
Returns the function pointer of the specified procedure.
thread::ThreadOptions thread_
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
NumaNodeMemory * get_local_memory() const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
uint64_t stat_snapshot_cache_misses_
ThreadStatus status_
Impersonation status of this thread.
const ErrorStack kRetOk
Normal return value for no-error case.
void advance_current_global_epoch()
Requests to advance the current global epoch as soon as possible and blocks until it actually does...
uint16_t mcs_implementation_type_
Defines which implementation of MCS locks to use for RW locks.
xct::McsRwSimpleBlock * get_mcs_rw_simple_blocks()
Unconditionally takes MCS lock on the given mcs_lock.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
void set_default_rll_threshold_for_this_xct(uint16_t value)
PagePool * get_volatile_pool()
Convenient way of writing hex integers to stream.
xct::McsRwExtendedBlock * mcs_rw_extended_blocks_
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
void(* VolatilePageInit)(const VolatilePageInitArguments &args)
A function pointer to initialize a volatile page.
ThreadPolicy thread_policy_
Thread policy for worker threads.
ErrorCode follow_page_pointers_for_read_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
Batched version of follow_page_pointer with will_modify==false.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
xct::McsRwAsyncMapping * mcs_rw_async_mappings_
PageHeader & get_header()
At least the basic header exists in all pages.
void * task_output_memory_
bool running_sysxct_
Whether we are already running a sysxct using this workspace.
The thread has completed the task and set the result.
Sorted list of all locks, either read-lock or write-lock, taken in the current run.
ThreadPimplCllReleaseAllFunctor(ThreadPimpl *pimpl)
ThreadRef get_thread_ref(ThreadId id)
ErrorStack(* Proc)(const ProcArguments &args)
A function pointer of a user/system stored procedure.
Used to store an epoch value with each entry in PagePoolOffsetChunk.
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
ErrorCode acquire_local_work_memory(uint32_t size, void **out, uint32_t alignment=8)
Get a tentative work memory of the specified size from pre-allocated thread-private memory...
bool is_stop_requested() const
xct::UniversalLockId cll_get_max_locked_id() const
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint8_t get_numa_node() const
ErrorCode on_snapshot_cache_miss(storage::SnapshotPagePointer page_id, memory::PagePoolOffset *pool_offset)
Set of arguments, both inputs and outputs, given to each procedure.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries=0)
Methods related to System transactions (sysxct) nested under this thread.
ThreadMemoryAnchors * get_thread_memory_anchors(thread::ThreadId thread_id)
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
bool simple_mcs_rw_
shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
uint32_t get_safe_offset_count(const Epoch &threshold) const
Returns the number of offsets (always from index-0) whose safe_epoch_ is strictly-before the given ep...
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
ErrorCode
Enum of error codes defined in error_code.xmacro.
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
Per-thread reused work memory for system transactions.
The thread has terminated (either error or normal, check the result to differentiate them)...
cache::CacheHashtable * snapshot_cache_hashtable_
same above
Protects against all anomalies in all situations.
ErrorCode cll_try_or_acquire_multiple_locks(xct::LockListPosition upto_pos)
Acquire multiple locks up to the given position in canonical order.
void signal()
Signal it to let waiters exit.
bool is_error() const
Returns if this return code is not kErrorCodeOk.
cache::CacheOptions cache_
ErrorCode follow_page_pointers_for_write_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.
ContentId find(storage::SnapshotPagePointer page_id) const __attribute__((always_inline))
Returns an offset for the given page ID opportunistically.
PagePool * get_volatile_pool()
void initialize(memory::NumaCoreMemory *core_memory, uint32_t *mcs_block_current, uint32_t *mcs_rw_async_mapping_current)
ThreadControlBlock * control_block_
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.