libfoedus-core
FOEDUS Core Library
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
foedus::xct::XctManagerPimpl Class Referencefinal

Pimpl object of XctManager. More...

Detailed Description

Pimpl object of XctManager.

A private pimpl object for XctManager. Do not include this header from a client program unless you know what you are doing.

Definition at line 100 of file xct_manager_pimpl.hpp.

#include <xct_manager_pimpl.hpp>

Inheritance diagram for foedus::xct::XctManagerPimpl:
Collaboration diagram for foedus::xct::XctManagerPimpl:

Public Member Functions

 XctManagerPimpl ()=delete
 
 XctManagerPimpl (Engine *engine)
 
ErrorStack initialize_once () override
 
ErrorStack uninitialize_once () override
 
Epoch get_current_global_epoch () const
 
Epoch get_requested_global_epoch () const
 
Epoch get_current_global_epoch_weak () const
 
ErrorCode begin_xct (thread::Thread *context, IsolationLevel isolation_level)
 User transactions related methods. More...
 
ErrorCode precommit_xct (thread::Thread *context, Epoch *commit_epoch)
 This is the gut of commit protocol. More...
 
ErrorCode abort_xct (thread::Thread *context)
 
ErrorCode wait_for_commit (Epoch commit_epoch, int64_t wait_microseconds)
 
void set_requested_global_epoch (Epoch request)
 
void advance_current_global_epoch ()
 
void wait_for_current_global_epoch (Epoch target_epoch, int64_t wait_microseconds)
 
void wakeup_epoch_chime_thread ()
 
ErrorCode precommit_xct_readonly (thread::Thread *context, Epoch *commit_epoch)
 precommit_xct() if the transaction is read-only More...
 
ErrorCode precommit_xct_readwrite (thread::Thread *context, Epoch *commit_epoch)
 precommit_xct() if the transaction is read-write More...
 
bool precommit_xct_lock_track_write (thread::Thread *context, WriteXctAccess *entry)
 used from precommit_xct_lock() to track moved record More...
 
bool precommit_xct_verify_track_read (thread::Thread *context, ReadXctAccess *entry)
 used from verification methods to track moved record More...
 
ErrorCode precommit_xct_lock (thread::Thread *context, XctId *max_xct_id)
 Phase 1 of precommit_xct() More...
 
ErrorCode precommit_xct_lock_batch_track_moved (thread::Thread *context)
 Subroutine of precommit_xct_lock to track most of moved records in write-set. More...
 
bool precommit_xct_verify_readonly (thread::Thread *context, Epoch *commit_epoch)
 Phase 2 of precommit_xct() for read-only case. More...
 
bool precommit_xct_verify_readwrite (thread::Thread *context, XctId *max_xct_id)
 Phase 2 of precommit_xct() for read-write case. More...
 
bool precommit_xct_verify_pointer_set (thread::Thread *context)
 Returns false if there is any pointer set conflict. More...
 
bool precommit_xct_verify_page_version_set (thread::Thread *context)
 Returns false if there is any page version conflict. More...
 
void precommit_xct_apply (thread::Thread *context, XctId max_xct_id, Epoch *commit_epoch)
 Phase 3 of precommit_xct() More...
 
void release_and_clear_all_current_locks (thread::Thread *context)
 unlocking all acquired locks, used when commit/abort. More...
 
bool precommit_xct_acquire_writer_lock (thread::Thread *context, WriteXctAccess *write)
 
void precommit_xct_sort_access (thread::Thread *context)
 
bool precommit_xct_try_acquire_writer_locks (thread::Thread *context)
 
bool precommit_xct_request_writer_lock (thread::Thread *context, WriteXctAccess *write)
 
void handle_epoch_chime ()
 Main routine for epoch_chime_thread_. More...
 
void handle_epoch_chime_wait_grace_period (Epoch grace_epoch)
 Makes sure all worker threads will commit with an epoch larger than grace_epoch. More...
 
bool is_stop_requested () const
 
void pause_accepting_xct ()
 Pause all begin_xct until you call resume_accepting_xct() More...
 
void resume_accepting_xct ()
 Make sure you call this after pause_accepting_xct(). More...
 
void wait_until_resume_accepting_xct (thread::Thread *context)
 
- Public Member Functions inherited from foedus::DefaultInitializable
 DefaultInitializable ()
 
virtual ~DefaultInitializable ()
 
 DefaultInitializable (const DefaultInitializable &)=delete
 
DefaultInitializableoperator= (const DefaultInitializable &)=delete
 
ErrorStack initialize () override final
 Typical implementation of Initializable::initialize() that provides initialize-once semantics. More...
 
ErrorStack uninitialize () override final
 Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics. More...
 
bool is_initialized () const override final
 Returns whether the object has been already initialized or not. More...
 
- Public Member Functions inherited from foedus::Initializable
virtual ~Initializable ()
 

Public Attributes

Engine *const engine_
 
XctManagerControlBlockcontrol_block_
 
std::thread epoch_chime_thread_
 This thread keeps advancing the current_global_epoch_. More...
 

Constructor & Destructor Documentation

foedus::xct::XctManagerPimpl::XctManagerPimpl ( )
delete
foedus::xct::XctManagerPimpl::XctManagerPimpl ( Engine engine)
inlineexplicit

Definition at line 103 of file xct_manager_pimpl.hpp.

103 : engine_(engine) {}

Member Function Documentation

ErrorCode foedus::xct::XctManagerPimpl::abort_xct ( thread::Thread context)

Definition at line 954 of file xct_manager_pimpl.cpp.

References foedus::xct::RetrospectiveLockList::clear_entries(), foedus::xct::RetrospectiveLockList::construct(), foedus::xct::Xct::deactivate(), foedus::log::ThreadLogBuffer::discard_current_xct_log(), foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_read_set(), foedus::xct::Xct::get_read_set_size(), foedus::xct::Xct::get_retrospective_lock_list(), foedus::xct::Xct::get_rll_threshold_for_this_xct(), foedus::thread::Thread::get_thread_id(), foedus::thread::Thread::get_thread_log_buffer(), foedus::xct::RwLockableXctId::hotter(), foedus::xct::Xct::is_active(), foedus::xct::Xct::is_enable_rll_for_this_xct(), foedus::kErrorCodeOk, foedus::kErrorCodeXctNoXct, foedus::xct::ReadXctAccess::observed_owner_id_, foedus::xct::RecordXctAccess::owner_id_address_, release_and_clear_all_current_locks(), and foedus::xct::RwLockableXctId::xct_id_.

Referenced by foedus::xct::XctManager::abort_xct(), and precommit_xct().

954  {
955  Xct& current_xct = context->get_current_xct();
956  if (!current_xct.is_active()) {
957  return kErrorCodeXctNoXct;
958  }
959  DVLOG(1) << *context << " Aborted transaction in thread-" << context->get_thread_id();
960 
961 
962  // The abort was probably due to read-set verification failure, either in lock()'s
963  // 'related_read' check or in verify(). To handle both cases at once, we re-check
964  // all read-sets here and make violating ones hotter.
965  // This might sound like a bit of wasted effort, but anyway aborts must be reasonably
966  // infrequent (otherwise we are screwed!) so this doesn't add too much.
967  // Rather, we want to make sure we surely make them hotter.
968  // Actually, I spent half a day to figure out why a page doesn't become hot despite
969  // lots of aborts! (A: lock()'s related_read check forgot to make it hotter!)
970  ReadXctAccess* read_set = current_xct.get_read_set();
971  const uint32_t read_set_size = current_xct.get_read_set_size();
972  for (uint32_t i = 0; i < read_set_size; ++i) {
973  ReadXctAccess& access = read_set[i];
974  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
975  access.owner_id_address_->hotter(context);
976  }
977  }
978 
979  // When we abort, whether in precommit or via user's explicit abort, we construct RLL.
980  // Abort may happen due to try-failure in reads, so we now put this in here, not precommit.
981  // One drawback is that we can't check the "cause" of the abort.
982  // We should probably construct RLL only in the case of race-abort.
983  // So, we might revist this choice.
984  if (current_xct.is_enable_rll_for_this_xct()) {
985  const uint32_t threshold = current_xct.get_rll_threshold_for_this_xct();
986  current_xct.get_retrospective_lock_list()->construct(context, threshold);
987  } else {
988  current_xct.get_retrospective_lock_list()->clear_entries();
989  }
990 
992  current_xct.deactivate();
993  context->get_thread_log_buffer().discard_current_xct_log();
994  return kErrorCodeOk;
995 }
void release_and_clear_all_current_locks(thread::Thread *context)
unlocking all acquired locks, used when commit/abort.
0 means no-error.
Definition: error_code.hpp:87
0x0A04 : "XCTION : This thread is not running any transaction." .
Definition: error_code.hpp:199

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::advance_current_global_epoch ( )

Definition at line 255 of file xct_manager_pimpl.cpp.

References foedus::soc::SharedPolling::acquire_ticket(), control_block_, foedus::xct::XctManagerControlBlock::current_global_epoch_advanced_, get_current_global_epoch(), foedus::Epoch::one_more(), set_requested_global_epoch(), foedus::soc::SharedPolling::wait(), and wakeup_epoch_chime_thread().

Referenced by foedus::xct::XctManager::advance_current_global_epoch().

255  {
256  Epoch now = get_current_global_epoch();
257  Epoch request = now.one_more();
258  LOG(INFO) << "Requesting to immediately advance epoch. request=" << request << "...";
260  while (get_current_global_epoch() < request) {
262  {
264  if (get_current_global_epoch() < request) {
266  }
267  }
268  }
269 
270  LOG(INFO) << "epoch advanced. current_global_epoch_=" << get_current_global_epoch();
271 }
void wait(uint64_t demanded_ticket, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Unconditionally wait for signal.
uint64_t acquire_ticket() const
Gives the ticket to.
XctManagerControlBlock * control_block_
void set_requested_global_epoch(Epoch request)
soc::SharedPolling current_global_epoch_advanced_
Fired (broadcast) whenever current_global_epoch_ is advanced.

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::begin_xct ( thread::Thread context,
IsolationLevel  isolation_level 
)

User transactions related methods.

Definition at line 307 of file xct_manager_pimpl.cpp.

References foedus::xct::Xct::activate(), ASSERT_ND, control_block_, foedus::thread::Thread::get_current_xct(), foedus::xct::RetrospectiveLockList::get_last_active_entry(), foedus::xct::Xct::get_lock_free_write_set_size(), foedus::xct::Xct::get_mcs_block_current(), foedus::log::ThreadLogBuffer::get_offset_committed(), foedus::log::ThreadLogBuffer::get_offset_tail(), foedus::xct::Xct::get_read_set_size(), foedus::xct::Xct::get_retrospective_lock_list(), foedus::thread::Thread::get_thread_log_buffer(), foedus::xct::Xct::get_write_set_size(), foedus::xct::Xct::is_active(), foedus::kErrorCodeOk, foedus::kErrorCodeXctAlreadyRunning, foedus::xct::XctManagerControlBlock::new_transaction_paused_, UNLIKELY, and wait_until_resume_accepting_xct().

Referenced by foedus::xct::XctManager::begin_xct().

307  {
308  Xct& current_xct = context->get_current_xct();
309  if (current_xct.is_active()) {
311  }
314  }
315  DVLOG(1) << *context << " Began new transaction."
316  << " RLL size=" << current_xct.get_retrospective_lock_list()->get_last_active_entry();
317  current_xct.activate(isolation_level);
318  ASSERT_ND(current_xct.get_mcs_block_current() == 0);
319  ASSERT_ND(context->get_thread_log_buffer().get_offset_tail()
320  == context->get_thread_log_buffer().get_offset_committed());
321  ASSERT_ND(current_xct.get_read_set_size() == 0);
322  ASSERT_ND(current_xct.get_write_set_size() == 0);
323  ASSERT_ND(current_xct.get_lock_free_write_set_size() == 0);
324  return kErrorCodeOk;
325 }
0x0A03 : "XCTION : This thread is already running a transaction. Commit or abort it first...
Definition: error_code.hpp:198
void wait_until_resume_accepting_xct(thread::Thread *context)
0 means no-error.
Definition: error_code.hpp:87
std::atomic< bool > new_transaction_paused_
If true, all new requests to begin_xct() will be paused until this becomes false. ...
XctManagerControlBlock * control_block_
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

Epoch foedus::xct::XctManagerPimpl::get_current_global_epoch ( ) const
inline
Epoch foedus::xct::XctManagerPimpl::get_current_global_epoch_weak ( ) const
inline

Definition at line 113 of file xct_manager_pimpl.hpp.

References control_block_, and foedus::xct::XctManagerControlBlock::current_global_epoch_.

Referenced by foedus::xct::XctManager::get_current_global_epoch_weak(), foedus::xct::XctManager::get_current_grace_epoch_weak(), and precommit_xct_readwrite().

113  {
114  return Epoch(control_block_->current_global_epoch_.load(std::memory_order_relaxed));
115  }
XctManagerControlBlock * control_block_
std::atomic< Epoch::EpochInteger > current_global_epoch_
The current epoch of the entire engine.

Here is the caller graph for this function:

Epoch foedus::xct::XctManagerPimpl::get_requested_global_epoch ( ) const
inline

Definition at line 110 of file xct_manager_pimpl.hpp.

References control_block_, and foedus::xct::XctManagerControlBlock::requested_global_epoch_.

Referenced by handle_epoch_chime(), and set_requested_global_epoch().

110  {
111  return Epoch(control_block_->requested_global_epoch_.load());
112  }
std::atomic< Epoch::EpochInteger > requested_global_epoch_
If some thread requested to immediately advance epoch, the requested epoch.
XctManagerControlBlock * control_block_

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::handle_epoch_chime ( )

Main routine for epoch_chime_thread_.

Epoch Chime related methods.

This method keeps advancing global_epoch with the interval configured in XctOptions. This method exits when this object's uninitialize() is called.

Definition at line 135 of file xct_manager_pimpl.cpp.

References foedus::soc::SharedPolling::acquire_ticket(), ASSERT_ND, control_block_, foedus::xct::XctManagerControlBlock::current_global_epoch_, foedus::xct::XctManagerControlBlock::current_global_epoch_advanced_, engine_, foedus::xct::XctOptions::epoch_advance_interval_ms_, foedus::xct::XctManagerControlBlock::epoch_chime_wakeup_, get_current_global_epoch(), foedus::Engine::get_log_manager(), foedus::Engine::get_options(), get_requested_global_epoch(), handle_epoch_chime_wait_grace_period(), foedus::DefaultInitializable::is_initialized(), foedus::Engine::is_master(), is_stop_requested(), foedus::soc::kDefaultPollingSpins, foedus::assorted::memory_fence_acquire(), foedus::assorted::memory_fence_release(), foedus::Epoch::one_less(), foedus::Epoch::one_more(), foedus::soc::SharedPolling::signal(), SPINLOCK_WHILE, foedus::soc::SharedPolling::timedwait(), foedus::Epoch::value(), foedus::log::LogManager::wakeup_loggers(), and foedus::EngineOptions::xct_.

Referenced by initialize_once().

135  {
136  LOG(INFO) << "epoch_chime_thread started.";
138  // Wait until all the other initializations are done.
141  }
142  uint64_t interval_microsec = engine_->get_options().xct_.epoch_advance_interval_ms_ * 1000ULL;
143  LOG(INFO) << "epoch_chime_thread now starts processing. interval_microsec=" << interval_microsec;
144  while (!is_stop_requested()) {
145  {
147  if (is_stop_requested()) {
148  break;
149  }
150  if (get_requested_global_epoch() <= get_current_global_epoch()) { // otherwise no sleep
152  demand,
153  interval_microsec,
155  interval_microsec);
156  VLOG(1) << "epoch_chime_thread. wokeup with " << (signaled ? "signal" : "timeout");
157  }
158  }
159  if (is_stop_requested()) {
160  break;
161  }
162  VLOG(1) << "epoch_chime_thread. current_global_epoch_=" << get_current_global_epoch();
163  ASSERT_ND(get_current_global_epoch().is_valid());
164 
165  // Before advanding the epoch, we have to make sure there is no thread that might commit
166  // with previous epoch.
167  Epoch grace_epoch = get_current_global_epoch().one_less();
169  if (is_stop_requested()) {
170  break;
171  }
172 
173  {
174  // soc::SharedMutexScope scope(control_block_->current_global_epoch_advanced_.get_mutex());
175  // There is only one thread (this) that might update current_global_epoch_, so
176  // no mutex needed. just set it and put fence.
180  }
182  }
183  LOG(INFO) << "epoch_chime_thread ended.";
184 }
Epoch one_less() const
Definition: epoch.hpp:122
void wakeup_loggers()
Wake up loggers if they are sleeping.
Definition: log_manager.cpp:35
soc::SharedPolling epoch_chime_wakeup_
Fired to wakeup epoch_chime_thread_.
const EngineOptions & get_options() const
Definition: engine.cpp:39
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
uint64_t acquire_ticket() const
Gives the ticket to.
uint32_t epoch_advance_interval_ms_
Intervals in milliseconds between epoch advancements.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
Epoch one_more() const
Definition: epoch.hpp:127
XctManagerControlBlock * control_block_
void handle_epoch_chime_wait_grace_period(Epoch grace_epoch)
Makes sure all worker threads will commit with an epoch larger than grace_epoch.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
std::atomic< Epoch::EpochInteger > current_global_epoch_
The current epoch of the entire engine.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
soc::SharedPolling current_global_epoch_advanced_
Fired (broadcast) whenever current_global_epoch_ is advanced.
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
void signal()
Signal it to let waiters exit.
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
const uint64_t kDefaultPollingSpins
Default value of polling_spins.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::handle_epoch_chime_wait_grace_period ( Epoch  grace_epoch)

Makes sure all worker threads will commit with an epoch larger than grace_epoch.

Definition at line 186 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::debugging::StopWatch::elapsed_ms(), foedus::debugging::StopWatch::elapsed_ns(), engine_, get_current_global_epoch(), foedus::thread::ThreadPool::get_group_ref(), foedus::thread::ThreadGroupRef::get_min_in_commit_epoch(), foedus::Engine::get_soc_count(), foedus::Engine::get_thread_pool(), foedus::Engine::is_master(), is_stop_requested(), foedus::Epoch::is_valid(), foedus::Epoch::one_more(), SPINLOCK_WHILE, and foedus::debugging::StopWatch::stop().

Referenced by handle_epoch_chime().

186  {
188  ASSERT_ND(grace_epoch.one_more() == get_current_global_epoch());
189 
190  VLOG(1) << "XctManager waiting until all worker threads exit grace_epoch:" << grace_epoch;
191  debugging::StopWatch watch;
192 
193  // In most cases, all workers have already switched to the current epoch long before.
194  // Quickly check it, then really wait if there is suspicious one.
195  thread::ThreadPool* pool = engine_->get_thread_pool();
196  uint16_t nodes = engine_->get_soc_count();
197  for (uint16_t node = 0; node < nodes; ++node) {
198  // Check all threads' in-commit epoch now.
199  // We might add a background thread in each SOC to avoid checking too many threads
200  // in the master engine, but anyway this happens only once in tens of milliseconds.
201  thread::ThreadGroupRef* group = pool->get_group_ref(node);
202 
203  // @spinlock until the long-running transaction exits.
204  const uint32_t kSpins = 1 << 12; // some number of spins first, then sleep.
205  uint32_t spins = 0;
207  Epoch min_epoch = group->get_min_in_commit_epoch();
208  if (!min_epoch.is_valid() || min_epoch > grace_epoch) {
209  break;
210  }
211  ++spins;
212  if (spins == 1U) {
213  // first spin.
214  VLOG(0) << "Interesting, node-" << node << " has some thread that is still running "
215  << "epoch-" << grace_epoch << ". we have to wait before advancing epoch. min_epoch="
216  << min_epoch;
217  } else if (spins >= kSpins) {
218  if (spins == kSpins) {
219  LOG(INFO) << "node-" << node << " has some thread that is running "
220  << "epoch-" << grace_epoch << " for long time. we are still waiting before advancing"
221  << " epoch. min_epoch=" << min_epoch;
222  }
223  std::this_thread::sleep_for(std::chrono::milliseconds(10));
224  }
225  }
226  }
227 
228  watch.stop();
229  VLOG(1) << "grace_epoch-" << grace_epoch << " guaranteed. took " << watch.elapsed_ns() << "ns";
230  if (watch.elapsed_ms() > 10) {
231  LOG(INFO) << "Very interesting, grace_epoch-" << grace_epoch << " took "
232  << watch.elapsed_ms() << "ms to be guaranteed. Most likely there was a long-running xct";
233  }
234 }
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
Definition: engine.cpp:52

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::xct::XctManagerPimpl::initialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 84 of file xct_manager_pimpl.cpp.

References ASSERT_ND, control_block_, foedus::xct::XctManagerControlBlock::current_global_epoch_, engine_, foedus::xct::XctManagerControlBlock::epoch_chime_terminate_requested_, epoch_chime_thread_, ERROR_STACK, get_current_global_epoch(), foedus::soc::SharedMemoryRepo::get_global_memory_anchors(), foedus::savepoint::SavepointManager::get_initial_current_epoch(), foedus::Engine::get_savepoint_manager(), foedus::soc::SocManager::get_shared_memory_repo(), foedus::Engine::get_soc_manager(), foedus::Engine::get_storage_manager(), handle_epoch_chime(), foedus::xct::XctManagerControlBlock::initialize(), foedus::storage::StorageManager::is_initialized(), foedus::Engine::is_master(), foedus::kErrorCodeDepedentModuleUnavailableInit, foedus::kRetOk, foedus::xct::XctManagerControlBlock::requested_global_epoch_, foedus::Epoch::value(), and foedus::soc::GlobalMemoryAnchors::xct_manager_memory_.

84  {
85  LOG(INFO) << "Initializing XctManager..";
88  }
89  soc::SharedMemoryRepo* memory_repo = engine_->get_soc_manager()->get_shared_memory_repo();
90  control_block_ = memory_repo->get_global_memory_anchors()->xct_manager_memory_;
91 
92  if (engine_->is_master()) {
96  ASSERT_ND(get_current_global_epoch().is_valid());
99  epoch_chime_thread_ = std::move(std::thread(&XctManagerPimpl::handle_epoch_chime, this));
100  }
101  return kRetOk;
102 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
std::atomic< Epoch::EpochInteger > requested_global_epoch_
If some thread requested to immediately advance epoch, the requested epoch.
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
Definition: error_code.hpp:109
void handle_epoch_chime()
Main routine for epoch_chime_thread_.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
bool is_initialized() const override
Returns whether the object has been already initialized or not.
XctManagerControlBlock * control_block_
std::thread epoch_chime_thread_
This thread keeps advancing the current_global_epoch_.
const ErrorStack kRetOk
Normal return value for no-error case.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
std::atomic< Epoch::EpochInteger > current_global_epoch_
The current epoch of the entire engine.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
std::atomic< bool > epoch_chime_terminate_requested_
Protected by the mutex in epoch_chime_wakeup_.
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38

Here is the call graph for this function:

bool foedus::xct::XctManagerPimpl::is_stop_requested ( ) const

Definition at line 125 of file xct_manager_pimpl.cpp.

References ASSERT_ND, control_block_, engine_, foedus::xct::XctManagerControlBlock::epoch_chime_terminate_requested_, and foedus::Engine::is_master().

Referenced by handle_epoch_chime(), and handle_epoch_chime_wait_grace_period().

125  {
128 }
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
XctManagerControlBlock * control_block_
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
std::atomic< bool > epoch_chime_terminate_requested_
Protected by the mutex in epoch_chime_wakeup_.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::pause_accepting_xct ( )

Pause all begin_xct until you call resume_accepting_xct()

Definition at line 327 of file xct_manager_pimpl.cpp.

References control_block_, and foedus::xct::XctManagerControlBlock::new_transaction_paused_.

Referenced by foedus::xct::XctManager::pause_accepting_xct().

327  {
329 }
std::atomic< bool > new_transaction_paused_
If true, all new requests to begin_xct() will be paused until this becomes false. ...
XctManagerControlBlock * control_block_

Here is the caller graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::precommit_xct ( thread::Thread context,
Epoch commit_epoch 
)

This is the gut of commit protocol.

It's mostly same as [TU2013].

Definition at line 343 of file xct_manager_pimpl.cpp.

References abort_xct(), ASSERT_ND, foedus::xct::Xct::assert_related_read_write(), foedus::xct::RetrospectiveLockList::clear_entries(), foedus::xct::Xct::deactivate(), foedus::xct::Xct::get_current_lock_list(), foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_retrospective_lock_list(), foedus::xct::Xct::is_active(), foedus::xct::CurrentLockList::is_empty(), foedus::xct::Xct::is_read_only(), foedus::kErrorCodeOk, foedus::kErrorCodeXctNoXct, precommit_xct_readonly(), precommit_xct_readwrite(), and release_and_clear_all_current_locks().

Referenced by foedus::xct::XctManager::precommit_xct().

343  {
344  Xct& current_xct = context->get_current_xct();
345  if (!current_xct.is_active()) {
346  return kErrorCodeXctNoXct;
347  }
348  ASSERT_ND(current_xct.assert_related_read_write());
349 
350  ErrorCode result;
351  bool read_only = context->get_current_xct().is_read_only();
352  if (read_only) {
353  result = precommit_xct_readonly(context, commit_epoch);
354  } else {
355  result = precommit_xct_readwrite(context, commit_epoch);
356  }
357 
358  ASSERT_ND(current_xct.assert_related_read_write());
359  if (result != kErrorCodeOk) {
360  ErrorCode abort_ret = abort_xct(context);
361  ASSERT_ND(abort_ret == kErrorCodeOk);
362  DVLOG(1) << *context << " Aborting because of contention";
363  } else {
364  current_xct.get_retrospective_lock_list()->clear_entries();
366  current_xct.deactivate();
367  }
368  ASSERT_ND(current_xct.get_current_lock_list()->is_empty());
369  return result;
370 }
void release_and_clear_all_current_locks(thread::Thread *context)
unlocking all acquired locks, used when commit/abort.
ErrorCode precommit_xct_readwrite(thread::Thread *context, Epoch *commit_epoch)
precommit_xct() if the transaction is read-write
ErrorCode abort_xct(thread::Thread *context)
0 means no-error.
Definition: error_code.hpp:87
0x0A04 : "XCTION : This thread is not running any transaction." .
Definition: error_code.hpp:199
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorCode precommit_xct_readonly(thread::Thread *context, Epoch *commit_epoch)
precommit_xct() if the transaction is read-only
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_acquire_writer_lock ( thread::Thread context,
WriteXctAccess write 
)
void foedus::xct::XctManagerPimpl::precommit_xct_apply ( thread::Thread context,
XctId  max_xct_id,
Epoch commit_epoch 
)

Phase 3 of precommit_xct()

Parameters
[in]contextthread context
[in]max_xct_idlargest xct_id this transaction depends on, or max(all xct_id).
[in,out]commit_epochcommit epoch of this transaction. it's finalized in this function.

Assuming phase 1 and 2 are successfully completed, apply all changes. This method does NOT release locks yet. This is one difference from SILO.

Definition at line 868 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::xct::XctId::before(), foedus::xct::XctId::clear_status_bits(), engine_, foedus::thread::Thread::get_current_xct(), foedus::xct::XctId::get_epoch(), foedus::xct::Xct::get_id(), foedus::xct::Xct::get_lock_free_write_set(), foedus::xct::Xct::get_lock_free_write_set_size(), foedus::storage::StorageManager::get_name(), foedus::xct::XctId::get_ordinal(), foedus::Engine::get_storage_manager(), foedus::log::LogHeader::get_type(), foedus::xct::Xct::get_write_set(), foedus::xct::Xct::get_write_set_size(), foedus::log::BaseLogType::header_, foedus::log::invoke_apply_record(), foedus::xct::XctId::is_being_written(), foedus::xct::XctId::is_deleted(), foedus::xct::RwLockableXctId::is_keylocked(), foedus::Epoch::is_valid(), foedus::xct::Xct::issue_next_id(), foedus::log::kLogCodeHashDelete, foedus::log::kLogCodeMasstreeDelete, foedus::xct::WriteXctAccess::log_entry_, foedus::xct::LockFreeWriteXctAccess::log_entry_, foedus::assorted::memory_fence_release(), foedus::xct::RecordXctAccess::owner_id_address_, foedus::xct::WriteXctAccess::payload_address_, foedus::xct::XctId::set_being_written(), foedus::xct::XctId::set_deleted(), foedus::log::LogHeader::set_xct_id(), foedus::xct::RecordXctAccess::storage_id_, foedus::xct::LockFreeWriteXctAccess::storage_id_, and foedus::xct::RwLockableXctId::xct_id_.

Referenced by precommit_xct_readwrite().

871  {
872  Xct& current_xct = context->get_current_xct();
873  WriteXctAccess* write_set = current_xct.get_write_set();
874  uint32_t write_set_size = current_xct.get_write_set_size();
875  LockFreeWriteXctAccess* lock_free_write_set = current_xct.get_lock_free_write_set();
876  uint32_t lock_free_write_set_size = current_xct.get_lock_free_write_set_size();
877  DVLOG(1) << *context << " applying.. write_set_size=" << write_set_size
878  << ", lock_free_write_set_size=" << lock_free_write_set_size;
879 
880  current_xct.issue_next_id(max_xct_id, commit_epoch);
881  XctId new_xct_id = current_xct.get_id();
882  ASSERT_ND(new_xct_id.get_epoch() == *commit_epoch);
883  ASSERT_ND(new_xct_id.get_ordinal() > 0);
884  new_xct_id.clear_status_bits();
885  XctId new_deleted_xct_id = new_xct_id;
886  new_deleted_xct_id.set_deleted(); // used if the record after apply is in deleted state.
887 
888  DVLOG(1) << *context << " generated new xct id=" << new_xct_id;
889  for (uint32_t i = 0; i < write_set_size; ++i) {
890  WriteXctAccess& write = write_set[i];
891  DVLOG(2) << *context << " Applying "
892  << engine_->get_storage_manager()->get_name(write.storage_id_)
893  << ":" << write.owner_id_address_;
894  ASSERT_ND(write.owner_id_address_->is_keylocked());
895 
896  // We must be careful on the memory order of unlock and data write.
897  // We must write data first (invoke_apply), then unlock.
898  // Otherwise the correctness is not guaranteed.
899  ASSERT_ND(write.log_entry_);
900  write.log_entry_->header_.set_xct_id(new_xct_id);
901  ASSERT_ND(new_xct_id.get_epoch().is_valid());
902  if (i > 0 && write.owner_id_address_ == write_set[i - 1].owner_id_address_) {
903  // the previous one has already set being_written and kept the lock
904  ASSERT_ND(write.owner_id_address_->xct_id_.is_being_written());
905  } else {
906  ASSERT_ND(!write.owner_id_address_->xct_id_.is_being_written());
907  write.owner_id_address_->xct_id_.set_being_written();
909  }
911  write.log_entry_,
912  context,
913  write.storage_id_,
914  write.owner_id_address_,
915  write.payload_address_);
916  ASSERT_ND(!write.owner_id_address_->xct_id_.get_epoch().is_valid() ||
917  write.owner_id_address_->xct_id_.before(new_xct_id)); // ordered correctly?
918  if (i < write_set_size - 1 &&
919  write.owner_id_address_ == write_set[i + 1].owner_id_address_) {
920  DVLOG(0) << *context << " Multiple write sets on record "
921  << engine_->get_storage_manager()->get_name(write_set[i].storage_id_)
922  << ":" << write_set[i].owner_id_address_ << ". Unlock at the last one of the write sets";
923  // keep the lock for the next write set
924  } else {
925  // For this reason, we put memory_fence_release() between data and owner_id writes.
927  if (write.owner_id_address_->xct_id_.is_deleted()) {
928  // preserve delete-flag set by delete operations (so, the operation should be delete)
929  ASSERT_ND(
930  write.log_entry_->header_.get_type() == log::kLogCodeHashDelete ||
931  write.log_entry_->header_.get_type() == log::kLogCodeMasstreeDelete);
932  write.owner_id_address_->xct_id_ = new_deleted_xct_id;
933  } else {
934  ASSERT_ND(
935  write.log_entry_->header_.get_type() != log::kLogCodeHashDelete &&
936  write.log_entry_->header_.get_type() != log::kLogCodeMasstreeDelete);
937  write.owner_id_address_->xct_id_ = new_xct_id;
938  }
939  // This method does NOT unlock. we do it at the end, all locks together.
940  // This is another difference from SILO.
941  }
942  }
943  // lock-free write-set doesn't have to worry about lock or ordering.
944  for (uint32_t i = 0; i < lock_free_write_set_size; ++i) {
945  LockFreeWriteXctAccess& write = lock_free_write_set[i];
946  DVLOG(2) << *context << " Applying Lock-Free write "
947  << engine_->get_storage_manager()->get_name(write.storage_id_);
948  write.log_entry_->header_.set_xct_id(new_xct_id);
949  log::invoke_apply_record(write.log_entry_, context, write.storage_id_, nullptr, nullptr);
950  }
951  DVLOG(1) << *context << " applied and unlocked write set";
952 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
0x002A : foedus::storage::hash::HashDeleteLogType .
Definition: log_type.hpp:123
0x0034 : foedus::storage::masstree::MasstreeDeleteLogType .
Definition: log_type.hpp:128
const StorageName & get_name(StorageId id)
Returns the name of the given storage ID.
void invoke_apply_record(void *log_buffer, thread::Thread *context, storage::StorageId storage_id, xct::RwLockableXctId *owner_id_address, char *payload_address)
Invokes the apply logic for a record-wise log type.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::precommit_xct_lock ( thread::Thread context,
XctId max_xct_id 
)

Phase 1 of precommit_xct()

Parameters
[in]contextthread context
[out]max_xct_idlargest xct_id this transaction depends on, or max(locked xct_id).
Returns
true if successful. false if we need to abort the transaction, in which case locks are not obtained yet (so no need for unlock).

Try to lock all records we are going to write. After phase 2, we take memory fence.

Definition at line 539 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::xct::Xct::assert_related_read_write(), foedus::xct::CurrentLockList::batch_insert_write_placeholders(), CHECK_ERROR_CODE, foedus::thread::Thread::cll_release_all_locks_after(), foedus::thread::Thread::cll_try_or_acquire_single_lock(), foedus::xct::XctOptions::force_canonical_xlocks_in_precommit_, foedus::xct::CurrentLockList::get_array(), foedus::xct::Xct::get_current_lock_list(), foedus::thread::Thread::get_current_xct(), foedus::thread::Thread::get_engine(), foedus::xct::CurrentLockList::get_last_locked_entry(), foedus::Engine::get_options(), foedus::xct::Xct::get_write_set(), foedus::xct::Xct::get_write_set_size(), foedus::xct::RwLockableXctId::is_keylocked(), foedus::xct::RwLockableXctId::is_moved(), foedus::xct::RwLockableXctId::is_next_layer(), foedus::xct::CurrentLockListIteratorForWriteSet::is_valid(), foedus::kErrorCodeOk, foedus::kErrorCodeXctRaceAbort, foedus::xct::kLockListPositionInvalid, foedus::xct::kWriteLock, foedus::xct::LockEntry::lock_, foedus::xct::RwLockableXctId::needs_track_moved(), foedus::xct::ReadXctAccess::observed_owner_id_, foedus::xct::RecordXctAccess::owner_id_address_, precommit_xct_lock_batch_track_moved(), precommit_xct_sort_access(), foedus::xct::LockEntry::preferred_mode_, foedus::xct::WriteXctAccess::related_read_, foedus::xct::XctId::store_max(), foedus::xct::LockEntry::taken_mode_, foedus::xct::LockEntry::universal_lock_id_, UNLIKELY, foedus::EngineOptions::xct_, and foedus::xct::RwLockableXctId::xct_id_.

Referenced by precommit_xct_readwrite().

539  {
540  Xct& current_xct = context->get_current_xct();
541  WriteXctAccess* write_set = current_xct.get_write_set();
542  uint32_t write_set_size = current_xct.get_write_set_size();
543  CurrentLockList* cll = current_xct.get_current_lock_list();
544  const bool force_canonical
545  = context->get_engine()->get_options().xct_.force_canonical_xlocks_in_precommit_;
546  DVLOG(1) << *context << " #write_sets=" << write_set_size << ", addr=" << write_set;
547 
548 #ifndef NDEBUG
549  // Initially, write-sets must be ordered by the insertion order.
550  for (uint32_t i = 0; i < write_set_size; ++i) {
551  ASSERT_ND(write_set[i].ordinal_ == i);
552  // Because of RLL, the write-set might or might not already have a corresponding lock
553  }
554 #endif // NDEBUG
555 
556 moved_retry:
558  precommit_xct_sort_access(context);
559 
560  // TODO(Hideaki) Because of how the new locks work, I'm not sure the prefetch still helps.
561  // Previously it did, but not significant either, so let's disable for now and revisit this later.
562  // we have to access the owner_id's pointed address. let's prefetch them in parallel
563  // for (uint32_t i = 0; i < write_set_size; ++i) {
564  // assorted::prefetch_cacheline(write_set[i].owner_id_address_);
565  // }
566 
567  // Create entries in CLL for all write sets. At this point they are not locked yet.
568  cll->batch_insert_write_placeholders(write_set, write_set_size);
569 
570  ASSERT_ND(current_xct.assert_related_read_write());
571  // Note: one alterantive is to sequentailly iterate over write-set and CLL,
572  // both of which are sorted. It will be faster, but probably not that different
573  // unless we have a large number of locks. For now binary_search each time.
574 
575  // Both write-set and CLL are sorted in canonical order. Simply iterate over in order.
576  // This is way faster than invoking cll->binary_search() for each write-set entry.
577  // Remember one thing, tho: write-set might have multiple entries for one record!
578  for (CurrentLockListIteratorForWriteSet it(write_set, cll, write_set_size);
579  it.is_valid();
580  it.next_writes()) {
581  // for multiple writes on one record, only the first one (write_cur_pos_) takes the lock
582  WriteXctAccess* entry = write_set + it.write_cur_pos_;
583 
584  LockListPosition lock_pos = it.cll_pos_;
585  ASSERT_ND(lock_pos != kLockListPositionInvalid); // we have put placeholders for all!
586  LockEntry* lock_entry = cll->get_array() + lock_pos;
587  ASSERT_ND(lock_entry->lock_ == entry->owner_id_address_);
588  ASSERT_ND(lock_entry->preferred_mode_ == kWriteLock);
589  if (lock_entry->taken_mode_ == kWriteLock) {
590  DVLOG(2) << "Yay, already taken. Probably Thanks to RLL?";
591  } else {
592  // We need to take or upgrade the lock.
593  // This might return kErrorCodeXctRaceAbort when we are not in canonical mode and
594  // we could not immediately acquire the lock.
595  const LockListPosition last_locked_pos = cll->get_last_locked_entry();
596  if (force_canonical &&
597  (last_locked_pos != kLockListPositionInvalid && last_locked_pos >= lock_pos)) {
598  // We are not in canonical mode. Let's aggressively restore canonical mode.
599  DVLOG(0) << "Aggressively releasing locks to restore canonical mode in precommit";
600  context->cll_release_all_locks_after(lock_entry->universal_lock_id_ - 1U);
601 #ifndef NDEBUG
602  const LockListPosition new_pos = cll->get_last_locked_entry();
603  ASSERT_ND(new_pos == kLockListPositionInvalid || new_pos < lock_pos);
604 #endif // NDEBUG
605  }
606  CHECK_ERROR_CODE(context->cll_try_or_acquire_single_lock(lock_pos));
607  }
608 
609  if (UNLIKELY(entry->owner_id_address_->needs_track_moved())) {
610  // Because we invoked precommit_xct_lock_batch_track_moved beforehand,
611  // this happens only when a concurrent thread again split some of the overlapping page.
612  // Though rare, it happens. In that case redo the procedure.
613  DLOG(INFO) << "Someone has split the page and moved our record after we check. Retry..";
614  goto moved_retry;
615  }
616 
617  ASSERT_ND(!entry->owner_id_address_->is_moved());
618  ASSERT_ND(!entry->owner_id_address_->is_next_layer());
619  ASSERT_ND(entry->owner_id_address_->is_keylocked());
620  max_xct_id->store_max(entry->owner_id_address_->xct_id_);
621 
622  // If we have to abort, we should abort early to not waste time.
623  // Thus, we check related read sets right here.
624  // For other writes of the same record, too.
625  for (uint32_t rec = it.write_cur_pos_; rec < it.write_next_pos_; ++rec) {
626  WriteXctAccess* r = write_set + rec;
627  ASSERT_ND(entry->owner_id_address_ == r->owner_id_address_);
628  if (r->related_read_) {
629  ASSERT_ND(r->related_read_->owner_id_address_ == r->owner_id_address_);
630  if (r->owner_id_address_->xct_id_ != r->related_read_->observed_owner_id_) {
631  return kErrorCodeXctRaceAbort;
632  }
633  }
634  }
635  }
636 
637  DVLOG(1) << *context << " locked write set";
638 #ifndef NDEBUG
639  for (uint32_t i = 0; i < write_set_size; ++i) {
640  ASSERT_ND(write_set[i].owner_id_address_->is_keylocked());
641  }
642 #endif // NDEBUG
643  return kErrorCodeOk;
644 }
ErrorCode precommit_xct_lock_batch_track_moved(thread::Thread *context)
Subroutine of precommit_xct_lock to track most of moved records in write-set.
const LockListPosition kLockListPositionInvalid
Definition: xct_id.hpp:149
0 means no-error.
Definition: error_code.hpp:87
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
Definition: xct_id.hpp:148
void precommit_xct_sort_access(thread::Thread *context)
taken_mode_: we took a write-lock.
Definition: xct_id.hpp:110
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::precommit_xct_lock_batch_track_moved ( thread::Thread context)

Subroutine of precommit_xct_lock to track most of moved records in write-set.

We initially did it per-record while we take a lock, but then we need lots of redoing when the transaction is batch-loading a bunch of records that cause many splits. Thus, before we take X-locks and do final check, we invoke this method to do best-effort tracking in one shot. Note that there still is a chance that the record is moved after this method before we take lock. In that case we redo the process. It happens.

Definition at line 514 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_write_set(), foedus::xct::Xct::get_write_set_size(), foedus::kErrorCodeOk, foedus::kErrorCodeXctRaceAbort, foedus::xct::RecordXctAccess::owner_id_address_, precommit_xct_lock_track_write(), and UNLIKELY.

Referenced by precommit_xct_lock().

514  {
515  Xct& current_xct = context->get_current_xct();
516  WriteXctAccess* write_set = current_xct.get_write_set();
517  uint32_t write_set_size = current_xct.get_write_set_size();
518  uint32_t moved_count = 0;
519  for (uint32_t i = 0; i < write_set_size; ++i) {
520  WriteXctAccess* entry = write_set + i;
521  auto* rec = entry->owner_id_address_;
522  if (UNLIKELY(rec->needs_track_moved())) {
523  if (!precommit_xct_lock_track_write(context, entry)) {
524  DLOG(INFO) << "Failed to track moved record?? this must be very rare";
525  return kErrorCodeXctRaceAbort;
526  }
527  ASSERT_ND(entry->owner_id_address_ != rec);
528  ++moved_count;
529  }
530  }
531  DVLOG(1) << "Tracked " << moved_count << " moved records in precommit_xct_lock.";
532  if (moved_count > 100U) {
533  LOG(INFO) << "Tracked " << moved_count << " moved records in precommit_xct_lock."
534  << " That's a lot. maybe this is a batch-loading transaction?";
535  }
536  return kErrorCodeOk;
537 }
0 means no-error.
Definition: error_code.hpp:87
bool precommit_xct_lock_track_write(thread::Thread *context, WriteXctAccess *entry)
used from precommit_xct_lock() to track moved record
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_lock_track_write ( thread::Thread context,
WriteXctAccess entry 
)

used from precommit_xct_lock() to track moved record

Definition at line 432 of file xct_manager_pimpl.cpp.

References ASSERT_ND, engine_, foedus::thread::Thread::get_global_volatile_page_resolver(), foedus::Engine::get_storage_manager(), foedus::xct::RwLockableXctId::needs_track_moved(), foedus::xct::TrackMovedRecordResult::new_owner_address_, foedus::xct::TrackMovedRecordResult::new_payload_address_, foedus::xct::RecordXctAccess::owner_id_address_, foedus::xct::WriteXctAccess::payload_address_, foedus::xct::WriteXctAccess::related_read_, foedus::xct::ReadXctAccess::related_write_, foedus::xct::RecordXctAccess::set_owner_id_resolve_lock_id(), and foedus::xct::RecordXctAccess::storage_id_.

Referenced by precommit_xct_lock_batch_track_moved().

433  {
434  ASSERT_ND(entry->owner_id_address_->needs_track_moved());
435  storage::StorageManager* st = engine_->get_storage_manager();
436  TrackMovedRecordResult result = st->track_moved_record(
437  entry->storage_id_,
438  entry->owner_id_address_,
439  entry);
440  if (result.new_owner_address_ == nullptr) {
441  // failed to track down even with the write set. this is a quite rare event.
442  // in that case, retry the whole transaction.
443  DLOG(INFO) << "Failed to track moved record even with write set";
444  return false;
445  }
446  const auto& resolver = context->get_global_volatile_page_resolver();
447  if (entry->related_read_) {
448  // also apply the result to related read access so that we don't have to track twice.
449  ASSERT_ND(entry->related_read_->related_write_ == entry);
450  ASSERT_ND(entry->related_read_->owner_id_address_ == entry->owner_id_address_);
451  entry->related_read_->set_owner_id_resolve_lock_id(resolver, result.new_owner_address_);
452  }
453  entry->set_owner_id_resolve_lock_id(resolver, result.new_owner_address_);
454  entry->payload_address_ = result.new_payload_address_;
455  return true;
456 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
xct::TrackMovedRecordResult track_moved_record(StorageId storage_id, xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
Resolves a "moved" record.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::precommit_xct_readonly ( thread::Thread context,
Epoch commit_epoch 
)

precommit_xct() if the transaction is read-only

If the transaction is read-only, commit-epoch (serialization point) is the largest epoch number in the read set. We don't have to take two memory fences in this case.

Definition at line 371 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::log::ThreadLogBuffer::get_offset_committed(), foedus::log::ThreadLogBuffer::get_offset_tail(), foedus::thread::Thread::get_thread_log_buffer(), foedus::kErrorCodeOk, foedus::kErrorCodeXctRaceAbort, foedus::assorted::memory_fence_acquire(), and precommit_xct_verify_readonly().

Referenced by precommit_xct().

371  {
372  DVLOG(1) << *context << " Committing read_only";
373  ASSERT_ND(context->get_thread_log_buffer().get_offset_committed() ==
374  context->get_thread_log_buffer().get_offset_tail());
375  *commit_epoch = Epoch();
376  assorted::memory_fence_acquire(); // this is enough for read-only case
377  if (precommit_xct_verify_readonly(context, commit_epoch)) {
378  return kErrorCodeOk;
379  } else {
380  return kErrorCodeXctRaceAbort;
381  }
382 }
bool precommit_xct_verify_readonly(thread::Thread *context, Epoch *commit_epoch)
Phase 2 of precommit_xct() for read-only case.
0 means no-error.
Definition: error_code.hpp:87
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::precommit_xct_readwrite ( thread::Thread context,
Epoch commit_epoch 
)

precommit_xct() if the transaction is read-write

See [TU2013] for the full protocol in this case.

Definition at line 384 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::log::ThreadLogBuffer::discard_current_xct_log(), foedus::log::LogOptions::emulation_, engine_, get_current_global_epoch_weak(), foedus::thread::Thread::get_current_xct(), foedus::thread::Thread::get_in_commit_epoch_address(), foedus::Engine::get_options(), foedus::thread::Thread::get_thread_log_buffer(), foedus::xct::Xct::get_write_set(), foedus::xct::Xct::get_write_set_size(), foedus::Epoch::kEpochInitialDurable, foedus::kErrorCodeOk, foedus::kErrorCodeXctRaceAbort, foedus::EngineOptions::log_, foedus::assorted::memory_fence_acq_rel(), foedus::assorted::memory_fence_release(), foedus::fs::DeviceEmulationOptions::null_device_, precommit_xct_apply(), precommit_xct_lock(), precommit_xct_verify_readwrite(), foedus::log::ThreadLogBuffer::publish_committed_log(), and foedus::xct::XctId::set().

Referenced by precommit_xct().

384  {
385  DVLOG(1) << *context << " Committing read-write";
386  XctId max_xct_id;
387  max_xct_id.set(Epoch::kEpochInitialDurable, 1); // TODO(Hideaki) not quite..
388  ErrorCode lock_ret = precommit_xct_lock(context, &max_xct_id); // Phase 1
389  if (lock_ret != kErrorCodeOk) {
390  return lock_ret;
391  }
392 
393  // BEFORE the first fence, update the in commit epoch for epoch chime.
394  // see InCommitEpochGuard class comments for why we need to do this.
395  Epoch conservative_epoch = get_current_global_epoch_weak();
396  InCommitEpochGuard guard(context->get_in_commit_epoch_address(), conservative_epoch);
397 
399 
400  *commit_epoch = get_current_global_epoch_weak(); // serialization point!
401  DVLOG(1) << *context << " Acquired read-write commit epoch " << *commit_epoch;
402 
404  bool verified = precommit_xct_verify_readwrite(context, &max_xct_id); // phase 2
405 #ifndef NDEBUG
406  {
407  WriteXctAccess* write_set = context->get_current_xct().get_write_set();
408  uint32_t write_set_size = context->get_current_xct().get_write_set_size();
409  for (uint32_t i = 0; i < write_set_size; ++i) {
410  ASSERT_ND(write_set[i].owner_id_address_->is_keylocked());
411  ASSERT_ND(!write_set[i].owner_id_address_->needs_track_moved());
412  ASSERT_ND(write_set[i].log_entry_);
413  ASSERT_ND(write_set[i].payload_address_);
414  }
415  }
416 #endif // NDEBUG
417  if (verified) {
418  precommit_xct_apply(context, max_xct_id, commit_epoch); // phase 3. this does NOT unlock
419  // announce log AFTER (with fence) apply, because apply sets xct_order in the logs.
422  context->get_thread_log_buffer().discard_current_xct_log();
423  } else {
424  context->get_thread_log_buffer().publish_committed_log(*commit_epoch);
425  }
426  return kErrorCodeOk;
427  }
428  return kErrorCodeXctRaceAbort;
429 }
Epoch get_current_global_epoch_weak() const
void precommit_xct_apply(thread::Thread *context, XctId max_xct_id, Epoch *commit_epoch)
Phase 3 of precommit_xct()
bool precommit_xct_verify_readwrite(thread::Thread *context, XctId *max_xct_id)
Phase 2 of precommit_xct() for read-write case.
ErrorCode precommit_xct_lock(thread::Thread *context, XctId *max_xct_id)
Phase 1 of precommit_xct()
const EngineOptions & get_options() const
Definition: engine.cpp:39
0 means no-error.
Definition: error_code.hpp:87
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
As there is no transaction in ep-1, initial durable_epoch is 1.
Definition: epoch.hpp:70
bool null_device_
[Experiments] as if we write out to /dev/null.
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_request_writer_lock ( thread::Thread context,
WriteXctAccess write 
)
void foedus::xct::XctManagerPimpl::precommit_xct_sort_access ( thread::Thread context)

Definition at line 478 of file xct_manager_pimpl.cpp.

References ASSERT_ND, foedus::xct::Xct::assert_related_read_write(), foedus::xct::WriteXctAccess::compare(), foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_write_set(), foedus::xct::Xct::get_write_set_size(), foedus::xct::RecordXctAccess::ordinal_, foedus::xct::RecordXctAccess::owner_id_address_, foedus::xct::WriteXctAccess::related_read_, and foedus::xct::ReadXctAccess::related_write_.

Referenced by precommit_xct_lock().

478  {
479  Xct& current_xct = context->get_current_xct();
480  WriteXctAccess* write_set = current_xct.get_write_set();
481  uint32_t write_set_size = current_xct.get_write_set_size();
482 
483  ASSERT_ND(current_xct.assert_related_read_write());
484  std::sort(write_set, write_set + write_set_size, WriteXctAccess::compare);
485  // after the sorting, the related-link from read-set to write-set is now broken.
486  // we fix it by following the back-link from write-set to read-set.
487  for (uint32_t i = 0; i < write_set_size; ++i) {
488  WriteXctAccess* entry = write_set + i;
489  if (entry->related_read_) {
490  ASSERT_ND(entry->owner_id_address_ == entry->related_read_->owner_id_address_);
491  ASSERT_ND(entry->related_read_->related_write_);
492  entry->related_read_->related_write_ = entry;
493  }
494  entry->ordinal_ = i;
495  }
496  ASSERT_ND(current_xct.assert_related_read_write());
497 
498 #ifndef NDEBUG
499  ASSERT_ND(current_xct.assert_related_read_write());
500  // check that write sets are now sorted.
501  // when address is the same, they must be ordered by the order they are created.
502  // eg: repeated overwrites to one record should be applied in the order the user issued.
503  for (uint32_t i = 1; i < write_set_size; ++i) {
504  ASSERT_ND(write_set[i - 1].ordinal_ != write_set[i].ordinal_);
505  if (write_set[i].owner_lock_id_ == write_set[i - 1].owner_lock_id_) {
506  ASSERT_ND(write_set[i - 1].ordinal_ < write_set[i].ordinal_);
507  } else {
508  ASSERT_ND(write_set[i - 1].owner_lock_id_ < write_set[i].owner_lock_id_);
509  }
510  }
511 #endif // NDEBUG
512 }
static bool compare(const WriteXctAccess &left, const WriteXctAccess &right) __attribute__((always_inline))
Definition: xct_access.hpp:181
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_try_acquire_writer_locks ( thread::Thread context)
bool foedus::xct::XctManagerPimpl::precommit_xct_verify_page_version_set ( thread::Thread context)

Returns false if there is any page version conflict.

Definition at line 847 of file xct_manager_pimpl.cpp.

References foedus::xct::PageVersionAccess::address_, foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_page_version_set(), foedus::xct::Xct::get_page_version_set_size(), foedus::xct::PageVersionAccess::observed_, foedus::assorted::prefetch_cacheline(), and foedus::storage::PageVersion::status_.

Referenced by precommit_xct_verify_readonly(), and precommit_xct_verify_readwrite().

847  {
848  const Xct& current_xct = context->get_current_xct();
849  const PageVersionAccess* page_version_set = current_xct.get_page_version_set();
850  const uint32_t page_version_set_size = current_xct.get_page_version_set_size();
851  for (uint32_t i = 0; i < page_version_set_size; ++i) {
852  // let's prefetch address_ in parallel
853  if (i % kReadsetPrefetchBatch == 0) {
854  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < page_version_set_size; ++j) {
855  assorted::prefetch_cacheline(page_version_set[j].address_);
856  }
857  }
858  const PageVersionAccess& access = page_version_set[i];
859  if (access.address_->status_ != access.observed_) {
860  DLOG(WARNING) << *context << " page version is changed by other transaction. will abort"
861  " observed=" << access.observed_ << ", now=" << access.address_->status_;
862  return false;
863  }
864  }
865  return true;
866 }
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
const uint16_t kReadsetPrefetchBatch

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_verify_pointer_set ( thread::Thread context)

Returns false if there is any pointer set conflict.

Definition at line 828 of file xct_manager_pimpl.cpp.

References foedus::xct::PointerAccess::address_, foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_pointer_set(), foedus::xct::Xct::get_pointer_set_size(), foedus::xct::PointerAccess::observed_, foedus::assorted::prefetch_cacheline(), and foedus::storage::VolatilePagePointer::word.

Referenced by precommit_xct_verify_readonly(), and precommit_xct_verify_readwrite().

828  {
829  const Xct& current_xct = context->get_current_xct();
830  const PointerAccess* pointer_set = current_xct.get_pointer_set();
831  const uint32_t pointer_set_size = current_xct.get_pointer_set_size();
832  for (uint32_t i = 0; i < pointer_set_size; ++i) {
833  // let's prefetch address_ in parallel
834  if (i % kReadsetPrefetchBatch == 0) {
835  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < pointer_set_size; ++j) {
836  assorted::prefetch_cacheline(pointer_set[j].address_);
837  }
838  }
839  const PointerAccess& access = pointer_set[i];
840  if (access.address_->word != access.observed_.word) {
841  DLOG(WARNING) << *context << " volatile ptr is changed by other transaction. will abort";
842  return false;
843  }
844  }
845  return true;
846 }
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
const uint16_t kReadsetPrefetchBatch

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_verify_readonly ( thread::Thread context,
Epoch commit_epoch 
)

Phase 2 of precommit_xct() for read-only case.

Returns
true if verification succeeded. false if we need to abort.

Verify the observed read set and set the commit epoch to the highest epoch it observed.

Definition at line 648 of file xct_manager_pimpl.cpp.

References ASSERT_ND, engine_, foedus::thread::Thread::get_current_xct(), foedus::log::LogManager::get_durable_global_epoch_weak(), foedus::xct::XctId::get_epoch(), foedus::xct::Xct::get_lock_free_read_set(), foedus::xct::Xct::get_lock_free_read_set_size(), foedus::Engine::get_log_manager(), foedus::storage::StorageManager::get_name(), foedus::xct::Xct::get_read_set(), foedus::xct::Xct::get_read_set_size(), foedus::Engine::get_storage_manager(), foedus::xct::XctId::is_being_written(), foedus::xct::RwLockableXctId::needs_track_moved(), foedus::xct::ReadXctAccess::observed_owner_id_, foedus::xct::LockFreeReadXctAccess::observed_owner_id_, foedus::xct::RecordXctAccess::owner_id_address_, foedus::xct::LockFreeReadXctAccess::owner_id_address_, precommit_xct_verify_page_version_set(), precommit_xct_verify_pointer_set(), precommit_xct_verify_track_read(), foedus::assorted::prefetch_cacheline(), foedus::xct::RecordXctAccess::storage_id_, foedus::Epoch::store_max(), UNLIKELY, and foedus::xct::RwLockableXctId::xct_id_.

Referenced by precommit_xct_readonly().

648  {
649  Xct& current_xct = context->get_current_xct();
650  ReadXctAccess* read_set = current_xct.get_read_set();
651  const uint32_t read_set_size = current_xct.get_read_set_size();
652  storage::StorageManager* st = engine_->get_storage_manager();
653  for (uint32_t i = 0; i < read_set_size; ++i) {
654  ASSERT_ND(read_set[i].related_write_ == nullptr);
655  // let's prefetch owner_id in parallel
656  if (i % kReadsetPrefetchBatch == 0) {
657  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < read_set_size; ++j) {
658  assorted::prefetch_cacheline(read_set[j].owner_id_address_);
659  }
660  }
661 
662  ReadXctAccess& access = read_set[i];
663  DVLOG(2) << *context << "Verifying " << st->get_name(access.storage_id_)
664  << ":" << access.owner_id_address_ << ". observed_xid=" << access.observed_owner_id_
665  << ", now_xid=" << access.owner_id_address_->xct_id_;
666  if (UNLIKELY(access.observed_owner_id_.is_being_written())) {
667  // safety net. observing this case.
668  // hm, this should be checked and retried during transaction.
669  // probably there still is some code to forget that.
670  // At least safe to abort here, so keep it this way for now.
671  DLOG(WARNING) << *context << "?? this should have been checked. being_written! will abort";
672  return false;
673  }
674 
675  // Implementation Note: we do verify the versions whether we took a lock on this record or not.
676  // In a sentence, this is the simplest and most reliable while wasted cost is not that much.
677 
678  // In a paragraph.. We could check whether it's locked, and in \e some case skip verification,
679  // but think of case 1) read A without read-lock, later take a write-lock on A due to RLL.
680  // also case 2) read A without read-lock, then read A again but this time with read-lock.
681  // Yes, we could rule these cases out by checking something for each read/write,
682  // but that's fragile. too much complexity for little. we just verify always. period.
683  if (UNLIKELY(access.owner_id_address_->needs_track_moved())) {
684  if (!precommit_xct_verify_track_read(context, &access)) {
685  return false;
686  }
687  }
688  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
689  DLOG(WARNING) << *context << " read set changed by other transaction. will abort";
690  // read clobbered
691  return false;
692  }
693 
694  // Remembers the highest epoch observed.
695  commit_epoch->store_max(access.observed_owner_id_.get_epoch());
696  }
697 
698  // Check lock-free read-set, which is a bit simpler.
699  LockFreeReadXctAccess* lock_free_read_set = current_xct.get_lock_free_read_set();
700  const uint32_t lock_free_read_set_size = current_xct.get_lock_free_read_set_size();
701  for (uint32_t i = 0; i < lock_free_read_set_size; ++i) {
702  const LockFreeReadXctAccess& access = lock_free_read_set[i];
703  ASSERT_ND(!access.owner_id_address_->needs_track_moved());
704  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
705  DLOG(WARNING) << *context << " lock free read set changed by other transaction. will abort";
706  return false;
707  }
708 
709  commit_epoch->store_max(access.observed_owner_id_.get_epoch());
710  }
711 
712  DVLOG(1) << *context << "Read-only higest epoch observed: " << *commit_epoch;
713  if (!commit_epoch->is_valid()) {
714  DVLOG(1) << *context
715  << " Read-only higest epoch was empty. The transaction has no read set??";
716  // In this case, set already-durable epoch. We don't have to use atomic version because
717  // it's just conservatively telling how long it should wait.
718  *commit_epoch = Epoch(engine_->get_log_manager()->get_durable_global_epoch_weak());
719  }
720 
721  // Check Page Pointer/Version
722  if (!precommit_xct_verify_pointer_set(context)) {
723  return false;
724  } else if (!precommit_xct_verify_page_version_set(context)) {
725  return false;
726  } else {
727  return true;
728  }
729 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
Epoch get_durable_global_epoch_weak() const
Non-atomic version of the method.
Definition: log_manager.cpp:39
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
bool precommit_xct_verify_pointer_set(thread::Thread *context)
Returns false if there is any pointer set conflict.
const uint16_t kReadsetPrefetchBatch
bool precommit_xct_verify_page_version_set(thread::Thread *context)
Returns false if there is any page version conflict.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool precommit_xct_verify_track_read(thread::Thread *context, ReadXctAccess *entry)
used from verification methods to track moved record

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_verify_readwrite ( thread::Thread context,
XctId max_xct_id 
)

Phase 2 of precommit_xct() for read-write case.

Parameters
[in]contextthread context
[in,out]max_xct_idlargest xct_id this transaction depends on, or max(all xct_id).
Returns
true if verification succeeded. false if we need to abort.

Verify the observed read set and write set against the same record. Because phase 2 is after the memory fence, no thread would take new locks while checking.

Definition at line 731 of file xct_manager_pimpl.cpp.

References ASSERT_ND, engine_, foedus::thread::Thread::get_current_xct(), foedus::xct::Xct::get_lock_free_read_set(), foedus::xct::Xct::get_lock_free_read_set_size(), foedus::storage::StorageManager::get_name(), foedus::xct::Xct::get_read_set(), foedus::xct::Xct::get_read_set_size(), foedus::Engine::get_storage_manager(), foedus::xct::XctId::is_being_written(), foedus::xct::RwLockableXctId::needs_track_moved(), foedus::xct::ReadXctAccess::observed_owner_id_, foedus::xct::LockFreeReadXctAccess::observed_owner_id_, foedus::xct::RecordXctAccess::owner_id_address_, foedus::xct::LockFreeReadXctAccess::owner_id_address_, precommit_xct_verify_page_version_set(), precommit_xct_verify_pointer_set(), precommit_xct_verify_track_read(), foedus::assorted::prefetch_cacheline(), foedus::xct::ReadXctAccess::related_write_, foedus::xct::RecordXctAccess::storage_id_, foedus::xct::XctId::store_max(), UNLIKELY, and foedus::xct::RwLockableXctId::xct_id_.

Referenced by precommit_xct_readwrite().

731  {
732  Xct& current_xct = context->get_current_xct();
733  ReadXctAccess* read_set = current_xct.get_read_set();
734  const uint32_t read_set_size = current_xct.get_read_set_size();
735  for (uint32_t i = 0; i < read_set_size; ++i) {
736  // let's prefetch owner_id in parallel
737  if (i % kReadsetPrefetchBatch == 0) {
738  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < read_set_size; ++j) {
739  if (read_set[j].related_write_ == nullptr) {
740  assorted::prefetch_cacheline(read_set[j].owner_id_address_);
741  }
742  }
743  }
744  // The owning transaction has changed.
745  // We don't check ordinal here because there is no change we are racing with ourselves.
746  ReadXctAccess& access = read_set[i];
747  if (UNLIKELY(access.observed_owner_id_.is_being_written())) {
748  // same as above.
749  DLOG(WARNING) << *context << "?? this should have been checked. being_written! will abort";
750  return false;
751  }
752  storage::StorageManager* st = engine_->get_storage_manager();
753  if (access.related_write_) {
754  // we already checked this in lock()
755  DVLOG(3) << *context << " skipped read-sets that are already checked";
756  ASSERT_ND(access.observed_owner_id_ == access.owner_id_address_->xct_id_);
757  continue;
758  }
759  DVLOG(2) << *context << " Verifying " << st->get_name(access.storage_id_)
760  << ":" << access.owner_id_address_ << ". observed_xid=" << access.observed_owner_id_
761  << ", now_xid=" << access.owner_id_address_->xct_id_;
762  // As noted in precommit_xct_verify_readonly, we verify read-set whether it's locked or not.
763 
764  // read-set has to also track moved records.
765  // however, unlike write-set locks, we don't have to do retry-loop.
766  // if the rare event (yet another concurrent split) happens, we just abort the transaction.
767  if (UNLIKELY(access.owner_id_address_->needs_track_moved())) {
768  if (!precommit_xct_verify_track_read(context, &access)) {
769  return false;
770  }
771  }
772 
773  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
774  DVLOG(1) << *context << " read set changed by other transaction. will abort";
775  // same as read_only
776  return false;
777  }
778 
779  /*
780  // Hideaki[2016Feb] I think I remember why I kept this here. When we didn't have the
781  // "being_written" flag, we did need it even after splitting XID/TID.
782  // But, now that we have it in XID, it's surely safe without this.
783  // Still.. in case I miss something, I leave it here commented out.
784 
785  // Hideaki: Umm, after several changes, I'm now not sure if we still need this check.
786  // As far as XID hasn't changed, do we care whether others locked it or not?
787  // Thanks to the separation of XID and Lock-word, I think this is now unnecessary.
788  // If it's our own lock, we haven't applied our changes yet, so safe. If it's other's lock,
789  // why do we care as far as XID hasn'nt changed? Everyone updates XID -> unlocks in this order.
790  // Anyways, we are in the course of a bigger change, so revisit it later.
791  if (access.owner_id_address_->is_keylocked()) {
792  DVLOG(2) << *context
793  << " read set contained a locked record. was it myself who locked it?";
794  LockListPosition my_lock_pos = cll->binary_search(access.owner_id_address_);
795  if (my_lock_pos != kLockListPositionInvalid && cll->get_array()[my_lock_pos].is_locked()) {
796  DVLOG(2) << *context << " okay, myself. go on.";
797  } else {
798  DVLOG(1) << *context << " no, not me. will abort";
799  return false;
800  }
801  }
802  */
803  max_xct_id->store_max(access.observed_owner_id_);
804  }
805 
806  // Check Page Pointer/Version
807  // Check lock-free read-set, which is a bit simpler.
808  LockFreeReadXctAccess* lock_free_read_set = current_xct.get_lock_free_read_set();
809  const uint32_t lock_free_read_set_size = current_xct.get_lock_free_read_set_size();
810  for (uint32_t i = 0; i < lock_free_read_set_size; ++i) {
811  const LockFreeReadXctAccess& access = lock_free_read_set[i];
812  ASSERT_ND(!access.owner_id_address_->needs_track_moved());
813  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
814  DLOG(WARNING) << *context << " lock free read set changed by other transaction. will abort";
815  return false;
816  }
817  }
818 
819  if (!precommit_xct_verify_pointer_set(context)) {
820  return false;
821  } else if (!precommit_xct_verify_page_version_set(context)) {
822  return false;
823  } else {
824  return true;
825  }
826 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
bool precommit_xct_verify_pointer_set(thread::Thread *context)
Returns false if there is any pointer set conflict.
const uint16_t kReadsetPrefetchBatch
bool precommit_xct_verify_page_version_set(thread::Thread *context)
Returns false if there is any page version conflict.
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
bool precommit_xct_verify_track_read(thread::Thread *context, ReadXctAccess *entry)
used from verification methods to track moved record

Here is the call graph for this function:

Here is the caller graph for this function:

bool foedus::xct::XctManagerPimpl::precommit_xct_verify_track_read ( thread::Thread context,
ReadXctAccess entry 
)

used from verification methods to track moved record

Definition at line 458 of file xct_manager_pimpl.cpp.

References ASSERT_ND, engine_, foedus::thread::Thread::get_global_volatile_page_resolver(), foedus::Engine::get_storage_manager(), foedus::xct::RwLockableXctId::needs_track_moved(), foedus::xct::TrackMovedRecordResult::new_owner_address_, foedus::xct::RecordXctAccess::owner_id_address_, foedus::xct::ReadXctAccess::related_write_, foedus::xct::RecordXctAccess::set_owner_id_resolve_lock_id(), foedus::xct::RecordXctAccess::storage_id_, and foedus::storage::StorageManager::track_moved_record().

Referenced by precommit_xct_verify_readonly(), and precommit_xct_verify_readwrite().

459  {
460  ASSERT_ND(entry->owner_id_address_->needs_track_moved());
461  ASSERT_ND(entry->related_write_ == nullptr); // if there is, lock() should have updated it.
462  storage::StorageManager* st = engine_->get_storage_manager();
463  TrackMovedRecordResult result = st->track_moved_record(
464  entry->storage_id_,
465  entry->owner_id_address_,
466  entry->related_write_);
467  if (result.new_owner_address_ == nullptr) {
468  // failed to track down. if entry->related_write_ is null, this always happens when
469  // the record is now in next layer. in this case, retry the whole transaction.
470  return false;
471  }
472 
473  const auto& resolver = context->get_global_volatile_page_resolver();
474  entry->set_owner_id_resolve_lock_id(resolver, result.new_owner_address_);
475  return true;
476 }
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
xct::TrackMovedRecordResult track_moved_record(StorageId storage_id, xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
Resolves a "moved" record.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::release_and_clear_all_current_locks ( thread::Thread context)

unlocking all acquired locks, used when commit/abort.

Definition at line 997 of file xct_manager_pimpl.cpp.

References foedus::xct::CurrentLockList::clear_entries(), foedus::thread::Thread::cll_release_all_locks(), foedus::xct::Xct::get_current_lock_list(), and foedus::thread::Thread::get_current_xct().

Referenced by abort_xct(), and precommit_xct().

997  {
998  context->cll_release_all_locks();
999  CurrentLockList* cll = context->get_current_xct().get_current_lock_list();
1000  cll->clear_entries();
1001 }

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::resume_accepting_xct ( )

Make sure you call this after pause_accepting_xct().

Definition at line 330 of file xct_manager_pimpl.cpp.

References control_block_, and foedus::xct::XctManagerControlBlock::new_transaction_paused_.

Referenced by foedus::xct::XctManager::resume_accepting_xct().

330  {
332 }
std::atomic< bool > new_transaction_paused_
If true, all new requests to begin_xct() will be paused until this becomes false. ...
XctManagerControlBlock * control_block_

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::set_requested_global_epoch ( Epoch  request)

Definition at line 241 of file xct_manager_pimpl.cpp.

References control_block_, get_requested_global_epoch(), foedus::xct::XctManagerControlBlock::requested_global_epoch_, and foedus::Epoch::value().

Referenced by advance_current_global_epoch(), wait_for_commit(), and wait_for_current_global_epoch().

241  {
242  // set request value, atomically
243  while (true) {
244  Epoch already_requested = get_requested_global_epoch();
245  if (already_requested >= request) {
246  break;
247  }
248  Epoch::EpochInteger cmp = already_requested.value();
249  if (control_block_->requested_global_epoch_.compare_exchange_strong(cmp, request.value())) {
250  break;
251  }
252  }
253 }
std::atomic< Epoch::EpochInteger > requested_global_epoch_
If some thread requested to immediately advance epoch, the requested epoch.
uint32_t EpochInteger
Unsigned integer representation of epoch.
Definition: epoch.hpp:64
XctManagerControlBlock * control_block_

Here is the call graph for this function:

Here is the caller graph for this function:

ErrorStack foedus::xct::XctManagerPimpl::uninitialize_once ( )
overridevirtual

Implements foedus::DefaultInitializable.

Definition at line 104 of file xct_manager_pimpl.cpp.

References CHECK_ERROR, control_block_, foedus::ErrorStackBatch::emprace_back(), engine_, foedus::xct::XctManagerControlBlock::epoch_chime_terminate_requested_, epoch_chime_thread_, foedus::xct::XctManagerControlBlock::epoch_chime_wakeup_, ERROR_STACK, foedus::Engine::get_cache_manager(), foedus::Engine::get_storage_manager(), foedus::storage::StorageManager::is_initialized(), foedus::Engine::is_master(), foedus::kErrorCodeDepedentModuleUnavailableUninit, foedus::soc::SharedPolling::signal(), foedus::cache::CacheManager::stop_cleaner(), SUMMARIZE_ERROR_BATCH, and foedus::xct::XctManagerControlBlock::uninitialize().

104  {
105  LOG(INFO) << "Uninitializing XctManager..";
106  ErrorStackBatch batch;
109  }
110  // See CacheManager's comments for why we have to stop the cleaner here
112  if (engine_->is_master()) {
113  if (epoch_chime_thread_.joinable()) {
114  {
117  }
118  epoch_chime_thread_.join();
119  }
121  }
122  return SUMMARIZE_ERROR_BATCH(batch);
123 }
cache::CacheManager * get_cache_manager() const
See Snapshot Cache Manager.
Definition: engine.cpp:47
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
soc::SharedPolling epoch_chime_wakeup_
Fired to wakeup epoch_chime_thread_.
ErrorStack stop_cleaner()
Stops internal eviction thread even before uninitialize() of this object is called.
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
bool is_initialized() const override
Returns whether the object has been already initialized or not.
XctManagerControlBlock * control_block_
std::thread epoch_chime_thread_
This thread keeps advancing the current_global_epoch_.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
Definition: error_code.hpp:110
std::atomic< bool > epoch_chime_terminate_requested_
Protected by the mutex in epoch_chime_wakeup_.
void signal()
Signal it to let waiters exit.

Here is the call graph for this function:

ErrorCode foedus::xct::XctManagerPimpl::wait_for_commit ( Epoch  commit_epoch,
int64_t  wait_microseconds 
)

Definition at line 290 of file xct_manager_pimpl.cpp.

References engine_, get_current_global_epoch(), foedus::Engine::get_log_manager(), foedus::Epoch::one_more(), set_requested_global_epoch(), foedus::log::LogManager::wait_until_durable(), and wakeup_epoch_chime_thread().

Referenced by foedus::xct::XctManager::wait_for_commit().

290  {
291  // to durably commit transactions in commit_epoch, the current global epoch should be
292  // commit_epoch + 2 or more. (current-1 is grace epoch. current-2 is the the latest loggable ep)
293  Epoch target_epoch = commit_epoch.one_more().one_more();
294  if (target_epoch > get_current_global_epoch()) {
295  set_requested_global_epoch(target_epoch);
297  }
298 
299  return engine_->get_log_manager()->wait_until_durable(commit_epoch, wait_microseconds);
300 }
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
ErrorCode wait_until_durable(Epoch commit_epoch, int64_t wait_microseconds=-1)
Synchronously blocks until the durable global epoch reaches the given commit epoch or the given durat...
Definition: log_manager.cpp:46
void set_requested_global_epoch(Epoch request)

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::wait_for_current_global_epoch ( Epoch  target_epoch,
int64_t  wait_microseconds 
)

Definition at line 273 of file xct_manager_pimpl.cpp.

References foedus::soc::SharedPolling::acquire_ticket(), control_block_, foedus::xct::XctManagerControlBlock::current_global_epoch_advanced_, get_current_global_epoch(), set_requested_global_epoch(), foedus::soc::SharedPolling::timedwait(), and foedus::soc::SharedPolling::wait().

Referenced by foedus::xct::XctManager::wait_for_current_global_epoch().

273  {
274  // this method doesn't aggressively wake up the epoch-advance thread. it just waits.
275  set_requested_global_epoch(target_epoch);
276  while (get_current_global_epoch() < target_epoch) {
278  if (get_current_global_epoch() < target_epoch) {
279  if (wait_microseconds < 0) {
281  } else {
282  control_block_->current_global_epoch_advanced_.timedwait(demand, wait_microseconds);
283  return;
284  }
285  }
286  }
287 }
void wait(uint64_t demanded_ticket, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Unconditionally wait for signal.
uint64_t acquire_ticket() const
Gives the ticket to.
XctManagerControlBlock * control_block_
void set_requested_global_epoch(Epoch request)
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
soc::SharedPolling current_global_epoch_advanced_
Fired (broadcast) whenever current_global_epoch_ is advanced.

Here is the call graph for this function:

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::wait_until_resume_accepting_xct ( thread::Thread context)

Definition at line 334 of file xct_manager_pimpl.cpp.

References control_block_, and foedus::xct::XctManagerControlBlock::new_transaction_paused_.

Referenced by begin_xct().

334  {
335  LOG(INFO) << *context << " realized that new transactions are not accepted now."
336  << " waits until it is allowed to start a new transaction";
337  // no complex thing here. just sleep-check. this happens VERY infrequently.
338  while (control_block_->new_transaction_paused_.load()) {
339  std::this_thread::sleep_for(std::chrono::milliseconds(20));
340  }
341 }
std::atomic< bool > new_transaction_paused_
If true, all new requests to begin_xct() will be paused until this becomes false. ...
XctManagerControlBlock * control_block_

Here is the caller graph for this function:

void foedus::xct::XctManagerPimpl::wakeup_epoch_chime_thread ( )

Definition at line 237 of file xct_manager_pimpl.cpp.

References control_block_, foedus::xct::XctManagerControlBlock::epoch_chime_wakeup_, and foedus::soc::SharedPolling::signal().

Referenced by advance_current_global_epoch(), and wait_for_commit().

237  {
238  control_block_->epoch_chime_wakeup_.signal(); // hurrrrry up!
239 }
soc::SharedPolling epoch_chime_wakeup_
Fired to wakeup epoch_chime_thread_.
XctManagerControlBlock * control_block_
void signal()
Signal it to let waiters exit.

Here is the call graph for this function:

Here is the caller graph for this function:

Member Data Documentation

std::thread foedus::xct::XctManagerPimpl::epoch_chime_thread_

This thread keeps advancing the current_global_epoch_.

Launched only in master engine.

Definition at line 231 of file xct_manager_pimpl.hpp.

Referenced by initialize_once(), and uninitialize_once().


The documentation for this class was generated from the following files: