libfoedus-core
FOEDUS Core Library
xct_manager_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <chrono>
24 #include <thread>
25 #include <vector>
26 
27 #include "foedus/assert_nd.hpp"
28 #include "foedus/engine.hpp"
43 #include "foedus/thread/thread.hpp"
48 #include "foedus/xct/xct.hpp"
50 #include "foedus/xct/xct_id.hpp"
53 
54 namespace foedus {
55 namespace xct {
56 // XctManager methods defined here to enable inlining
58  return pimpl_->get_current_global_epoch();
59 }
61  return pimpl_->get_current_global_epoch_weak();
62 }
64  return pimpl_->get_current_global_epoch().one_less();
65 }
67  return pimpl_->get_current_global_epoch_weak().one_less();
68 }
69 
71 ErrorCode XctManager::wait_for_commit(Epoch commit_epoch, int64_t wait_microseconds) {
72  return pimpl_->wait_for_commit(commit_epoch, wait_microseconds);
73 }
74 
76  return pimpl_->begin_xct(context, isolation_level);
77 }
78 
80  return pimpl_->precommit_xct(context, commit_epoch);
81 }
82 ErrorCode XctManager::abort_xct(thread::Thread* context) { return pimpl_->abort_xct(context); }
83 
85  LOG(INFO) << "Initializing XctManager..";
88  }
91 
92  if (engine_->is_master()) {
96  ASSERT_ND(get_current_global_epoch().is_valid());
99  epoch_chime_thread_ = std::move(std::thread(&XctManagerPimpl::handle_epoch_chime, this));
100  }
101  return kRetOk;
102 }
103 
105  LOG(INFO) << "Uninitializing XctManager..";
106  ErrorStackBatch batch;
109  }
110  // See CacheManager's comments for why we have to stop the cleaner here
112  if (engine_->is_master()) {
113  if (epoch_chime_thread_.joinable()) {
114  {
117  }
118  epoch_chime_thread_.join();
119  }
121  }
122  return SUMMARIZE_ERROR_BATCH(batch);
123 }
124 
128 }
129 
136  LOG(INFO) << "epoch_chime_thread started.";
138  // Wait until all the other initializations are done.
141  }
142  uint64_t interval_microsec = engine_->get_options().xct_.epoch_advance_interval_ms_ * 1000ULL;
143  LOG(INFO) << "epoch_chime_thread now starts processing. interval_microsec=" << interval_microsec;
144  while (!is_stop_requested()) {
145  {
147  if (is_stop_requested()) {
148  break;
149  }
150  if (get_requested_global_epoch() <= get_current_global_epoch()) { // otherwise no sleep
152  demand,
153  interval_microsec,
155  interval_microsec);
156  VLOG(1) << "epoch_chime_thread. wokeup with " << (signaled ? "signal" : "timeout");
157  }
158  }
159  if (is_stop_requested()) {
160  break;
161  }
162  VLOG(1) << "epoch_chime_thread. current_global_epoch_=" << get_current_global_epoch();
163  ASSERT_ND(get_current_global_epoch().is_valid());
164 
165  // Before advanding the epoch, we have to make sure there is no thread that might commit
166  // with previous epoch.
167  Epoch grace_epoch = get_current_global_epoch().one_less();
169  if (is_stop_requested()) {
170  break;
171  }
172 
173  {
174  // soc::SharedMutexScope scope(control_block_->current_global_epoch_advanced_.get_mutex());
175  // There is only one thread (this) that might update current_global_epoch_, so
176  // no mutex needed. just set it and put fence.
180  }
182  }
183  LOG(INFO) << "epoch_chime_thread ended.";
184 }
185 
188  ASSERT_ND(grace_epoch.one_more() == get_current_global_epoch());
189 
190  VLOG(1) << "XctManager waiting until all worker threads exit grace_epoch:" << grace_epoch;
191  debugging::StopWatch watch;
192 
193  // In most cases, all workers have already switched to the current epoch long before.
194  // Quickly check it, then really wait if there is suspicious one.
196  uint16_t nodes = engine_->get_soc_count();
197  for (uint16_t node = 0; node < nodes; ++node) {
198  // Check all threads' in-commit epoch now.
199  // We might add a background thread in each SOC to avoid checking too many threads
200  // in the master engine, but anyway this happens only once in tens of milliseconds.
201  thread::ThreadGroupRef* group = pool->get_group_ref(node);
202 
203  // @spinlock until the long-running transaction exits.
204  const uint32_t kSpins = 1 << 12; // some number of spins first, then sleep.
205  uint32_t spins = 0;
207  Epoch min_epoch = group->get_min_in_commit_epoch();
208  if (!min_epoch.is_valid() || min_epoch > grace_epoch) {
209  break;
210  }
211  ++spins;
212  if (spins == 1U) {
213  // first spin.
214  VLOG(0) << "Interesting, node-" << node << " has some thread that is still running "
215  << "epoch-" << grace_epoch << ". we have to wait before advancing epoch. min_epoch="
216  << min_epoch;
217  } else if (spins >= kSpins) {
218  if (spins == kSpins) {
219  LOG(INFO) << "node-" << node << " has some thread that is running "
220  << "epoch-" << grace_epoch << " for long time. we are still waiting before advancing"
221  << " epoch. min_epoch=" << min_epoch;
222  }
223  std::this_thread::sleep_for(std::chrono::milliseconds(10));
224  }
225  }
226  }
227 
228  watch.stop();
229  VLOG(1) << "grace_epoch-" << grace_epoch << " guaranteed. took " << watch.elapsed_ns() << "ns";
230  if (watch.elapsed_ms() > 10) {
231  LOG(INFO) << "Very interesting, grace_epoch-" << grace_epoch << " took "
232  << watch.elapsed_ms() << "ms to be guaranteed. Most likely there was a long-running xct";
233  }
234 }
235 
236 
238  control_block_->epoch_chime_wakeup_.signal(); // hurrrrry up!
239 }
240 
242  // set request value, atomically
243  while (true) {
244  Epoch already_requested = get_requested_global_epoch();
245  if (already_requested >= request) {
246  break;
247  }
248  Epoch::EpochInteger cmp = already_requested.value();
249  if (control_block_->requested_global_epoch_.compare_exchange_strong(cmp, request.value())) {
250  break;
251  }
252  }
253 }
254 
257  Epoch request = now.one_more();
258  LOG(INFO) << "Requesting to immediately advance epoch. request=" << request << "...";
260  while (get_current_global_epoch() < request) {
262  {
264  if (get_current_global_epoch() < request) {
266  }
267  }
268  }
269 
270  LOG(INFO) << "epoch advanced. current_global_epoch_=" << get_current_global_epoch();
271 }
272 
273 void XctManagerPimpl::wait_for_current_global_epoch(Epoch target_epoch, int64_t wait_microseconds) {
274  // this method doesn't aggressively wake up the epoch-advance thread. it just waits.
275  set_requested_global_epoch(target_epoch);
276  while (get_current_global_epoch() < target_epoch) {
278  if (get_current_global_epoch() < target_epoch) {
279  if (wait_microseconds < 0) {
281  } else {
282  control_block_->current_global_epoch_advanced_.timedwait(demand, wait_microseconds);
283  return;
284  }
285  }
286  }
287 }
288 
289 
290 ErrorCode XctManagerPimpl::wait_for_commit(Epoch commit_epoch, int64_t wait_microseconds) {
291  // to durably commit transactions in commit_epoch, the current global epoch should be
292  // commit_epoch + 2 or more. (current-1 is grace epoch. current-2 is the the latest loggable ep)
293  Epoch target_epoch = commit_epoch.one_more().one_more();
294  if (target_epoch > get_current_global_epoch()) {
295  set_requested_global_epoch(target_epoch);
297  }
298 
299  return engine_->get_log_manager()->wait_until_durable(commit_epoch, wait_microseconds);
300 }
301 
308  Xct& current_xct = context->get_current_xct();
309  if (current_xct.is_active()) {
311  }
314  }
315  DVLOG(1) << *context << " Began new transaction."
316  << " RLL size=" << current_xct.get_retrospective_lock_list()->get_last_active_entry();
317  current_xct.activate(isolation_level);
318  ASSERT_ND(current_xct.get_mcs_block_current() == 0);
321  ASSERT_ND(current_xct.get_read_set_size() == 0);
322  ASSERT_ND(current_xct.get_write_set_size() == 0);
323  ASSERT_ND(current_xct.get_lock_free_write_set_size() == 0);
324  return kErrorCodeOk;
325 }
326 
329 }
332 }
333 
335  LOG(INFO) << *context << " realized that new transactions are not accepted now."
336  << " waits until it is allowed to start a new transaction";
337  // no complex thing here. just sleep-check. this happens VERY infrequently.
338  while (control_block_->new_transaction_paused_.load()) {
339  std::this_thread::sleep_for(std::chrono::milliseconds(20));
340  }
341 }
342 
344  Xct& current_xct = context->get_current_xct();
345  if (!current_xct.is_active()) {
346  return kErrorCodeXctNoXct;
347  }
348  ASSERT_ND(current_xct.assert_related_read_write());
349 
350  ErrorCode result;
351  bool read_only = context->get_current_xct().is_read_only();
352  if (read_only) {
353  result = precommit_xct_readonly(context, commit_epoch);
354  } else {
355  result = precommit_xct_readwrite(context, commit_epoch);
356  }
357 
358  ASSERT_ND(current_xct.assert_related_read_write());
359  if (result != kErrorCodeOk) {
360  ErrorCode abort_ret = abort_xct(context);
361  ASSERT_ND(abort_ret == kErrorCodeOk);
362  DVLOG(1) << *context << " Aborting because of contention";
363  } else {
366  current_xct.deactivate();
367  }
368  ASSERT_ND(current_xct.get_current_lock_list()->is_empty());
369  return result;
370 }
372  DVLOG(1) << *context << " Committing read_only";
375  *commit_epoch = Epoch();
376  assorted::memory_fence_acquire(); // this is enough for read-only case
377  if (precommit_xct_verify_readonly(context, commit_epoch)) {
378  return kErrorCodeOk;
379  } else {
380  return kErrorCodeXctRaceAbort;
381  }
382 }
383 
385  DVLOG(1) << *context << " Committing read-write";
386  XctId max_xct_id;
387  max_xct_id.set(Epoch::kEpochInitialDurable, 1); // TODO(Hideaki) not quite..
388  ErrorCode lock_ret = precommit_xct_lock(context, &max_xct_id); // Phase 1
389  if (lock_ret != kErrorCodeOk) {
390  return lock_ret;
391  }
392 
393  // BEFORE the first fence, update the in commit epoch for epoch chime.
394  // see InCommitEpochGuard class comments for why we need to do this.
395  Epoch conservative_epoch = get_current_global_epoch_weak();
396  InCommitEpochGuard guard(context->get_in_commit_epoch_address(), conservative_epoch);
397 
399 
400  *commit_epoch = get_current_global_epoch_weak(); // serialization point!
401  DVLOG(1) << *context << " Acquired read-write commit epoch " << *commit_epoch;
402 
404  bool verified = precommit_xct_verify_readwrite(context, &max_xct_id); // phase 2
405 #ifndef NDEBUG
406  {
407  WriteXctAccess* write_set = context->get_current_xct().get_write_set();
408  uint32_t write_set_size = context->get_current_xct().get_write_set_size();
409  for (uint32_t i = 0; i < write_set_size; ++i) {
410  ASSERT_ND(write_set[i].owner_id_address_->is_keylocked());
411  ASSERT_ND(!write_set[i].owner_id_address_->needs_track_moved());
412  ASSERT_ND(write_set[i].log_entry_);
413  ASSERT_ND(write_set[i].payload_address_);
414  }
415  }
416 #endif // NDEBUG
417  if (verified) {
418  precommit_xct_apply(context, max_xct_id, commit_epoch); // phase 3. this does NOT unlock
419  // announce log AFTER (with fence) apply, because apply sets xct_order in the logs.
423  } else {
424  context->get_thread_log_buffer().publish_committed_log(*commit_epoch);
425  }
426  return kErrorCodeOk;
427  }
428  return kErrorCodeXctRaceAbort;
429 }
430 
431 
433  thread::Thread* context, WriteXctAccess* entry) {
436  TrackMovedRecordResult result = st->track_moved_record(
437  entry->storage_id_,
438  entry->owner_id_address_,
439  entry);
440  if (result.new_owner_address_ == nullptr) {
441  // failed to track down even with the write set. this is a quite rare event.
442  // in that case, retry the whole transaction.
443  DLOG(INFO) << "Failed to track moved record even with write set";
444  return false;
445  }
446  const auto& resolver = context->get_global_volatile_page_resolver();
447  if (entry->related_read_) {
448  // also apply the result to related read access so that we don't have to track twice.
449  ASSERT_ND(entry->related_read_->related_write_ == entry);
452  }
453  entry->set_owner_id_resolve_lock_id(resolver, result.new_owner_address_);
454  entry->payload_address_ = result.new_payload_address_;
455  return true;
456 }
457 
459  thread::Thread* context, ReadXctAccess* entry) {
461  ASSERT_ND(entry->related_write_ == nullptr); // if there is, lock() should have updated it.
464  entry->storage_id_,
465  entry->owner_id_address_,
466  entry->related_write_);
467  if (result.new_owner_address_ == nullptr) {
468  // failed to track down. if entry->related_write_ is null, this always happens when
469  // the record is now in next layer. in this case, retry the whole transaction.
470  return false;
471  }
472 
473  const auto& resolver = context->get_global_volatile_page_resolver();
474  entry->set_owner_id_resolve_lock_id(resolver, result.new_owner_address_);
475  return true;
476 }
477 
479  Xct& current_xct = context->get_current_xct();
480  WriteXctAccess* write_set = current_xct.get_write_set();
481  uint32_t write_set_size = current_xct.get_write_set_size();
482 
483  ASSERT_ND(current_xct.assert_related_read_write());
484  std::sort(write_set, write_set + write_set_size, WriteXctAccess::compare);
485  // after the sorting, the related-link from read-set to write-set is now broken.
486  // we fix it by following the back-link from write-set to read-set.
487  for (uint32_t i = 0; i < write_set_size; ++i) {
488  WriteXctAccess* entry = write_set + i;
489  if (entry->related_read_) {
492  entry->related_read_->related_write_ = entry;
493  }
494  entry->ordinal_ = i;
495  }
496  ASSERT_ND(current_xct.assert_related_read_write());
497 
498 #ifndef NDEBUG
499  ASSERT_ND(current_xct.assert_related_read_write());
500  // check that write sets are now sorted.
501  // when address is the same, they must be ordered by the order they are created.
502  // eg: repeated overwrites to one record should be applied in the order the user issued.
503  for (uint32_t i = 1; i < write_set_size; ++i) {
504  ASSERT_ND(write_set[i - 1].ordinal_ != write_set[i].ordinal_);
505  if (write_set[i].owner_lock_id_ == write_set[i - 1].owner_lock_id_) {
506  ASSERT_ND(write_set[i - 1].ordinal_ < write_set[i].ordinal_);
507  } else {
508  ASSERT_ND(write_set[i - 1].owner_lock_id_ < write_set[i].owner_lock_id_);
509  }
510  }
511 #endif // NDEBUG
512 }
513 
515  Xct& current_xct = context->get_current_xct();
516  WriteXctAccess* write_set = current_xct.get_write_set();
517  uint32_t write_set_size = current_xct.get_write_set_size();
518  uint32_t moved_count = 0;
519  for (uint32_t i = 0; i < write_set_size; ++i) {
520  WriteXctAccess* entry = write_set + i;
521  auto* rec = entry->owner_id_address_;
522  if (UNLIKELY(rec->needs_track_moved())) {
523  if (!precommit_xct_lock_track_write(context, entry)) {
524  DLOG(INFO) << "Failed to track moved record?? this must be very rare";
525  return kErrorCodeXctRaceAbort;
526  }
527  ASSERT_ND(entry->owner_id_address_ != rec);
528  ++moved_count;
529  }
530  }
531  DVLOG(1) << "Tracked " << moved_count << " moved records in precommit_xct_lock.";
532  if (moved_count > 100U) {
533  LOG(INFO) << "Tracked " << moved_count << " moved records in precommit_xct_lock."
534  << " That's a lot. maybe this is a batch-loading transaction?";
535  }
536  return kErrorCodeOk;
537 }
538 
540  Xct& current_xct = context->get_current_xct();
541  WriteXctAccess* write_set = current_xct.get_write_set();
542  uint32_t write_set_size = current_xct.get_write_set_size();
543  CurrentLockList* cll = current_xct.get_current_lock_list();
544  const bool force_canonical
546  DVLOG(1) << *context << " #write_sets=" << write_set_size << ", addr=" << write_set;
547 
548 #ifndef NDEBUG
549  // Initially, write-sets must be ordered by the insertion order.
550  for (uint32_t i = 0; i < write_set_size; ++i) {
551  ASSERT_ND(write_set[i].ordinal_ == i);
552  // Because of RLL, the write-set might or might not already have a corresponding lock
553  }
554 #endif // NDEBUG
555 
556 moved_retry:
558  precommit_xct_sort_access(context);
559 
560  // TODO(Hideaki) Because of how the new locks work, I'm not sure the prefetch still helps.
561  // Previously it did, but not significant either, so let's disable for now and revisit this later.
562  // we have to access the owner_id's pointed address. let's prefetch them in parallel
563  // for (uint32_t i = 0; i < write_set_size; ++i) {
564  // assorted::prefetch_cacheline(write_set[i].owner_id_address_);
565  // }
566 
567  // Create entries in CLL for all write sets. At this point they are not locked yet.
568  cll->batch_insert_write_placeholders(write_set, write_set_size);
569 
570  ASSERT_ND(current_xct.assert_related_read_write());
571  // Note: one alterantive is to sequentailly iterate over write-set and CLL,
572  // both of which are sorted. It will be faster, but probably not that different
573  // unless we have a large number of locks. For now binary_search each time.
574 
575  // Both write-set and CLL are sorted in canonical order. Simply iterate over in order.
576  // This is way faster than invoking cll->binary_search() for each write-set entry.
577  // Remember one thing, tho: write-set might have multiple entries for one record!
578  for (CurrentLockListIteratorForWriteSet it(write_set, cll, write_set_size);
579  it.is_valid();
580  it.next_writes()) {
581  // for multiple writes on one record, only the first one (write_cur_pos_) takes the lock
582  WriteXctAccess* entry = write_set + it.write_cur_pos_;
583 
584  LockListPosition lock_pos = it.cll_pos_;
585  ASSERT_ND(lock_pos != kLockListPositionInvalid); // we have put placeholders for all!
586  LockEntry* lock_entry = cll->get_array() + lock_pos;
587  ASSERT_ND(lock_entry->lock_ == entry->owner_id_address_);
588  ASSERT_ND(lock_entry->preferred_mode_ == kWriteLock);
589  if (lock_entry->taken_mode_ == kWriteLock) {
590  DVLOG(2) << "Yay, already taken. Probably Thanks to RLL?";
591  } else {
592  // We need to take or upgrade the lock.
593  // This might return kErrorCodeXctRaceAbort when we are not in canonical mode and
594  // we could not immediately acquire the lock.
595  const LockListPosition last_locked_pos = cll->get_last_locked_entry();
596  if (force_canonical &&
597  (last_locked_pos != kLockListPositionInvalid && last_locked_pos >= lock_pos)) {
598  // We are not in canonical mode. Let's aggressively restore canonical mode.
599  DVLOG(0) << "Aggressively releasing locks to restore canonical mode in precommit";
600  context->cll_release_all_locks_after(lock_entry->universal_lock_id_ - 1U);
601 #ifndef NDEBUG
602  const LockListPosition new_pos = cll->get_last_locked_entry();
603  ASSERT_ND(new_pos == kLockListPositionInvalid || new_pos < lock_pos);
604 #endif // NDEBUG
605  }
607  }
608 
610  // Because we invoked precommit_xct_lock_batch_track_moved beforehand,
611  // this happens only when a concurrent thread again split some of the overlapping page.
612  // Though rare, it happens. In that case redo the procedure.
613  DLOG(INFO) << "Someone has split the page and moved our record after we check. Retry..";
614  goto moved_retry;
615  }
616 
620  max_xct_id->store_max(entry->owner_id_address_->xct_id_);
621 
622  // If we have to abort, we should abort early to not waste time.
623  // Thus, we check related read sets right here.
624  // For other writes of the same record, too.
625  for (uint32_t rec = it.write_cur_pos_; rec < it.write_next_pos_; ++rec) {
626  WriteXctAccess* r = write_set + rec;
628  if (r->related_read_) {
631  return kErrorCodeXctRaceAbort;
632  }
633  }
634  }
635  }
636 
637  DVLOG(1) << *context << " locked write set";
638 #ifndef NDEBUG
639  for (uint32_t i = 0; i < write_set_size; ++i) {
640  ASSERT_ND(write_set[i].owner_id_address_->is_keylocked());
641  }
642 #endif // NDEBUG
643  return kErrorCodeOk;
644 }
645 
646 const uint16_t kReadsetPrefetchBatch = 16;
647 
649  Xct& current_xct = context->get_current_xct();
650  ReadXctAccess* read_set = current_xct.get_read_set();
651  const uint32_t read_set_size = current_xct.get_read_set_size();
653  for (uint32_t i = 0; i < read_set_size; ++i) {
654  ASSERT_ND(read_set[i].related_write_ == nullptr);
655  // let's prefetch owner_id in parallel
656  if (i % kReadsetPrefetchBatch == 0) {
657  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < read_set_size; ++j) {
658  assorted::prefetch_cacheline(read_set[j].owner_id_address_);
659  }
660  }
661 
662  ReadXctAccess& access = read_set[i];
663  DVLOG(2) << *context << "Verifying " << st->get_name(access.storage_id_)
664  << ":" << access.owner_id_address_ << ". observed_xid=" << access.observed_owner_id_
665  << ", now_xid=" << access.owner_id_address_->xct_id_;
667  // safety net. observing this case.
668  // hm, this should be checked and retried during transaction.
669  // probably there still is some code to forget that.
670  // At least safe to abort here, so keep it this way for now.
671  DLOG(WARNING) << *context << "?? this should have been checked. being_written! will abort";
672  return false;
673  }
674 
675  // Implementation Note: we do verify the versions whether we took a lock on this record or not.
676  // In a sentence, this is the simplest and most reliable while wasted cost is not that much.
677 
678  // In a paragraph.. We could check whether it's locked, and in \e some case skip verification,
679  // but think of case 1) read A without read-lock, later take a write-lock on A due to RLL.
680  // also case 2) read A without read-lock, then read A again but this time with read-lock.
681  // Yes, we could rule these cases out by checking something for each read/write,
682  // but that's fragile. too much complexity for little. we just verify always. period.
684  if (!precommit_xct_verify_track_read(context, &access)) {
685  return false;
686  }
687  }
688  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
689  DLOG(WARNING) << *context << " read set changed by other transaction. will abort";
690  // read clobbered
691  return false;
692  }
693 
694  // Remembers the highest epoch observed.
695  commit_epoch->store_max(access.observed_owner_id_.get_epoch());
696  }
697 
698  // Check lock-free read-set, which is a bit simpler.
699  LockFreeReadXctAccess* lock_free_read_set = current_xct.get_lock_free_read_set();
700  const uint32_t lock_free_read_set_size = current_xct.get_lock_free_read_set_size();
701  for (uint32_t i = 0; i < lock_free_read_set_size; ++i) {
702  const LockFreeReadXctAccess& access = lock_free_read_set[i];
704  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
705  DLOG(WARNING) << *context << " lock free read set changed by other transaction. will abort";
706  return false;
707  }
708 
709  commit_epoch->store_max(access.observed_owner_id_.get_epoch());
710  }
711 
712  DVLOG(1) << *context << "Read-only higest epoch observed: " << *commit_epoch;
713  if (!commit_epoch->is_valid()) {
714  DVLOG(1) << *context
715  << " Read-only higest epoch was empty. The transaction has no read set??";
716  // In this case, set already-durable epoch. We don't have to use atomic version because
717  // it's just conservatively telling how long it should wait.
719  }
720 
721  // Check Page Pointer/Version
722  if (!precommit_xct_verify_pointer_set(context)) {
723  return false;
724  } else if (!precommit_xct_verify_page_version_set(context)) {
725  return false;
726  } else {
727  return true;
728  }
729 }
730 
732  Xct& current_xct = context->get_current_xct();
733  ReadXctAccess* read_set = current_xct.get_read_set();
734  const uint32_t read_set_size = current_xct.get_read_set_size();
735  for (uint32_t i = 0; i < read_set_size; ++i) {
736  // let's prefetch owner_id in parallel
737  if (i % kReadsetPrefetchBatch == 0) {
738  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < read_set_size; ++j) {
739  if (read_set[j].related_write_ == nullptr) {
740  assorted::prefetch_cacheline(read_set[j].owner_id_address_);
741  }
742  }
743  }
744  // The owning transaction has changed.
745  // We don't check ordinal here because there is no change we are racing with ourselves.
746  ReadXctAccess& access = read_set[i];
748  // same as above.
749  DLOG(WARNING) << *context << "?? this should have been checked. being_written! will abort";
750  return false;
751  }
753  if (access.related_write_) {
754  // we already checked this in lock()
755  DVLOG(3) << *context << " skipped read-sets that are already checked";
757  continue;
758  }
759  DVLOG(2) << *context << " Verifying " << st->get_name(access.storage_id_)
760  << ":" << access.owner_id_address_ << ". observed_xid=" << access.observed_owner_id_
761  << ", now_xid=" << access.owner_id_address_->xct_id_;
762  // As noted in precommit_xct_verify_readonly, we verify read-set whether it's locked or not.
763 
764  // read-set has to also track moved records.
765  // however, unlike write-set locks, we don't have to do retry-loop.
766  // if the rare event (yet another concurrent split) happens, we just abort the transaction.
768  if (!precommit_xct_verify_track_read(context, &access)) {
769  return false;
770  }
771  }
772 
773  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
774  DVLOG(1) << *context << " read set changed by other transaction. will abort";
775  // same as read_only
776  return false;
777  }
778 
779  /*
780  // Hideaki[2016Feb] I think I remember why I kept this here. When we didn't have the
781  // "being_written" flag, we did need it even after splitting XID/TID.
782  // But, now that we have it in XID, it's surely safe without this.
783  // Still.. in case I miss something, I leave it here commented out.
784 
785  // Hideaki: Umm, after several changes, I'm now not sure if we still need this check.
786  // As far as XID hasn't changed, do we care whether others locked it or not?
787  // Thanks to the separation of XID and Lock-word, I think this is now unnecessary.
788  // If it's our own lock, we haven't applied our changes yet, so safe. If it's other's lock,
789  // why do we care as far as XID hasn'nt changed? Everyone updates XID -> unlocks in this order.
790  // Anyways, we are in the course of a bigger change, so revisit it later.
791  if (access.owner_id_address_->is_keylocked()) {
792  DVLOG(2) << *context
793  << " read set contained a locked record. was it myself who locked it?";
794  LockListPosition my_lock_pos = cll->binary_search(access.owner_id_address_);
795  if (my_lock_pos != kLockListPositionInvalid && cll->get_array()[my_lock_pos].is_locked()) {
796  DVLOG(2) << *context << " okay, myself. go on.";
797  } else {
798  DVLOG(1) << *context << " no, not me. will abort";
799  return false;
800  }
801  }
802  */
803  max_xct_id->store_max(access.observed_owner_id_);
804  }
805 
806  // Check Page Pointer/Version
807  // Check lock-free read-set, which is a bit simpler.
808  LockFreeReadXctAccess* lock_free_read_set = current_xct.get_lock_free_read_set();
809  const uint32_t lock_free_read_set_size = current_xct.get_lock_free_read_set_size();
810  for (uint32_t i = 0; i < lock_free_read_set_size; ++i) {
811  const LockFreeReadXctAccess& access = lock_free_read_set[i];
813  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
814  DLOG(WARNING) << *context << " lock free read set changed by other transaction. will abort";
815  return false;
816  }
817  }
818 
819  if (!precommit_xct_verify_pointer_set(context)) {
820  return false;
821  } else if (!precommit_xct_verify_page_version_set(context)) {
822  return false;
823  } else {
824  return true;
825  }
826 }
827 
829  const Xct& current_xct = context->get_current_xct();
830  const PointerAccess* pointer_set = current_xct.get_pointer_set();
831  const uint32_t pointer_set_size = current_xct.get_pointer_set_size();
832  for (uint32_t i = 0; i < pointer_set_size; ++i) {
833  // let's prefetch address_ in parallel
834  if (i % kReadsetPrefetchBatch == 0) {
835  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < pointer_set_size; ++j) {
836  assorted::prefetch_cacheline(pointer_set[j].address_);
837  }
838  }
839  const PointerAccess& access = pointer_set[i];
840  if (access.address_->word != access.observed_.word) {
841  DLOG(WARNING) << *context << " volatile ptr is changed by other transaction. will abort";
842  return false;
843  }
844  }
845  return true;
846 }
848  const Xct& current_xct = context->get_current_xct();
849  const PageVersionAccess* page_version_set = current_xct.get_page_version_set();
850  const uint32_t page_version_set_size = current_xct.get_page_version_set_size();
851  for (uint32_t i = 0; i < page_version_set_size; ++i) {
852  // let's prefetch address_ in parallel
853  if (i % kReadsetPrefetchBatch == 0) {
854  for (uint32_t j = i; j < i + kReadsetPrefetchBatch && j < page_version_set_size; ++j) {
855  assorted::prefetch_cacheline(page_version_set[j].address_);
856  }
857  }
858  const PageVersionAccess& access = page_version_set[i];
859  if (access.address_->status_ != access.observed_) {
860  DLOG(WARNING) << *context << " page version is changed by other transaction. will abort"
861  " observed=" << access.observed_ << ", now=" << access.address_->status_;
862  return false;
863  }
864  }
865  return true;
866 }
867 
869  thread::Thread* context,
870  XctId max_xct_id,
871  Epoch *commit_epoch) {
872  Xct& current_xct = context->get_current_xct();
873  WriteXctAccess* write_set = current_xct.get_write_set();
874  uint32_t write_set_size = current_xct.get_write_set_size();
875  LockFreeWriteXctAccess* lock_free_write_set = current_xct.get_lock_free_write_set();
876  uint32_t lock_free_write_set_size = current_xct.get_lock_free_write_set_size();
877  DVLOG(1) << *context << " applying.. write_set_size=" << write_set_size
878  << ", lock_free_write_set_size=" << lock_free_write_set_size;
879 
880  current_xct.issue_next_id(max_xct_id, commit_epoch);
881  XctId new_xct_id = current_xct.get_id();
882  ASSERT_ND(new_xct_id.get_epoch() == *commit_epoch);
883  ASSERT_ND(new_xct_id.get_ordinal() > 0);
884  new_xct_id.clear_status_bits();
885  XctId new_deleted_xct_id = new_xct_id;
886  new_deleted_xct_id.set_deleted(); // used if the record after apply is in deleted state.
887 
888  DVLOG(1) << *context << " generated new xct id=" << new_xct_id;
889  for (uint32_t i = 0; i < write_set_size; ++i) {
890  WriteXctAccess& write = write_set[i];
891  DVLOG(2) << *context << " Applying "
893  << ":" << write.owner_id_address_;
895 
896  // We must be careful on the memory order of unlock and data write.
897  // We must write data first (invoke_apply), then unlock.
898  // Otherwise the correctness is not guaranteed.
899  ASSERT_ND(write.log_entry_);
900  write.log_entry_->header_.set_xct_id(new_xct_id);
901  ASSERT_ND(new_xct_id.get_epoch().is_valid());
902  if (i > 0 && write.owner_id_address_ == write_set[i - 1].owner_id_address_) {
903  // the previous one has already set being_written and kept the lock
905  } else {
909  }
911  write.log_entry_,
912  context,
913  write.storage_id_,
914  write.owner_id_address_,
915  write.payload_address_);
917  write.owner_id_address_->xct_id_.before(new_xct_id)); // ordered correctly?
918  if (i < write_set_size - 1 &&
919  write.owner_id_address_ == write_set[i + 1].owner_id_address_) {
920  DVLOG(0) << *context << " Multiple write sets on record "
921  << engine_->get_storage_manager()->get_name(write_set[i].storage_id_)
922  << ":" << write_set[i].owner_id_address_ << ". Unlock at the last one of the write sets";
923  // keep the lock for the next write set
924  } else {
925  // For this reason, we put memory_fence_release() between data and owner_id writes.
927  if (write.owner_id_address_->xct_id_.is_deleted()) {
928  // preserve delete-flag set by delete operations (so, the operation should be delete)
929  ASSERT_ND(
932  write.owner_id_address_->xct_id_ = new_deleted_xct_id;
933  } else {
934  ASSERT_ND(
937  write.owner_id_address_->xct_id_ = new_xct_id;
938  }
939  // This method does NOT unlock. we do it at the end, all locks together.
940  // This is another difference from SILO.
941  }
942  }
943  // lock-free write-set doesn't have to worry about lock or ordering.
944  for (uint32_t i = 0; i < lock_free_write_set_size; ++i) {
945  LockFreeWriteXctAccess& write = lock_free_write_set[i];
946  DVLOG(2) << *context << " Applying Lock-Free write "
948  write.log_entry_->header_.set_xct_id(new_xct_id);
949  log::invoke_apply_record(write.log_entry_, context, write.storage_id_, nullptr, nullptr);
950  }
951  DVLOG(1) << *context << " applied and unlocked write set";
952 }
953 
955  Xct& current_xct = context->get_current_xct();
956  if (!current_xct.is_active()) {
957  return kErrorCodeXctNoXct;
958  }
959  DVLOG(1) << *context << " Aborted transaction in thread-" << context->get_thread_id();
960 
961 
962  // The abort was probably due to read-set verification failure, either in lock()'s
963  // 'related_read' check or in verify(). To handle both cases at once, we re-check
964  // all read-sets here and make violating ones hotter.
965  // This might sound like a bit of wasted effort, but anyway aborts must be reasonably
966  // infrequent (otherwise we are screwed!) so this doesn't add too much.
967  // Rather, we want to make sure we surely make them hotter.
968  // Actually, I spent half a day to figure out why a page doesn't become hot despite
969  // lots of aborts! (A: lock()'s related_read check forgot to make it hotter!)
970  ReadXctAccess* read_set = current_xct.get_read_set();
971  const uint32_t read_set_size = current_xct.get_read_set_size();
972  for (uint32_t i = 0; i < read_set_size; ++i) {
973  ReadXctAccess& access = read_set[i];
974  if (access.observed_owner_id_ != access.owner_id_address_->xct_id_) {
975  access.owner_id_address_->hotter(context);
976  }
977  }
978 
979  // When we abort, whether in precommit or via user's explicit abort, we construct RLL.
980  // Abort may happen due to try-failure in reads, so we now put this in here, not precommit.
981  // One drawback is that we can't check the "cause" of the abort.
982  // We should probably construct RLL only in the case of race-abort.
983  // So, we might revist this choice.
984  if (current_xct.is_enable_rll_for_this_xct()) {
985  const uint32_t threshold = current_xct.get_rll_threshold_for_this_xct();
986  current_xct.get_retrospective_lock_list()->construct(context, threshold);
987  } else {
989  }
990 
992  current_xct.deactivate();
994  return kErrorCodeOk;
995 }
996 
998  context->cll_release_all_locks();
1000  cll->clear_entries();
1001 }
1002 
1003 } // namespace xct
1004 } // namespace foedus
ReadXctAccess * related_read_
Definition: xct_access.hpp:178
void issue_next_id(XctId max_xct_id, Epoch *epoch)
Called while a successful commit of xct to issue a new xct id.
Definition: xct.cpp:134
void resume_accepting_xct()
Make sure you call this after pause_accepting_xct().
uint32_t get_mcs_block_current() const
Definition: xct.hpp:116
log::RecordLogType * log_entry_
Pointer to the log entry in private log buffer for this write opereation.
Definition: xct_access.hpp:175
const memory::GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert page ID to page pointer.
Definition: thread.cpp:125
RwLockableXctId * owner_id_address_
Pointer to the TID we protect against.
Definition: xct_access.hpp:210
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
Epoch one_less() const
Definition: epoch.hpp:122
ErrorStack uninitialize_once() override
cache::CacheManager * get_cache_manager() const
See Snapshot Cache Manager.
Definition: engine.cpp:47
UniversalLockId universal_lock_id_
Used to order locks in canonical order.
ErrorStack initialize_once() override
void publish_committed_log(Epoch commit_epoch) __attribute__((always_inline))
Called when the current transaction is successfully committed.
Represents a record of special read-access during a transaction without any need for locking...
Definition: xct_access.hpp:200
void release_and_clear_all_current_locks(thread::Thread *context)
unlocking all acquired locks, used when commit/abort.
void wakeup_loggers()
Wake up loggers if they are sleeping.
Definition: log_manager.cpp:35
const storage::PageVersion * address_
Address to the page version.
Definition: xct_access.hpp:76
Epoch get_current_global_epoch_weak() const
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
Epoch get_current_global_epoch_weak() const
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
storage::StorageManager * get_storage_manager() const
See Storage Manager.
Definition: engine.cpp:60
A view of Thread group object for other SOCs and master engine.
Definition: thread_ref.hpp:106
void set_deleted() __attribute__((always_inline))
Definition: xct_id.hpp:1027
void precommit_xct_apply(thread::Thread *context, XctId max_xct_id, Epoch *commit_epoch)
Phase 3 of precommit_xct()
Epoch get_durable_global_epoch_weak() const
Non-atomic version of the method.
Definition: log_manager.cpp:39
RwLockableXctId * new_owner_address_
Definition: xct_id.hpp:1186
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
bool assert_related_read_write() const __attribute__((always_inline))
This debug method checks whether the related_read_ and related_write_ fileds in read/write sets are c...
Definition: xct.hpp:538
const PointerAccess * get_pointer_set() const
Definition: xct.hpp:160
ErrorCode precommit_xct_readwrite(thread::Thread *context, Epoch *commit_epoch)
precommit_xct() if the transaction is read-write
0x0A03 : "XCTION : This thread is already running a transaction. Commit or abort it first...
Definition: error_code.hpp:198
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
Methods related to Current Lock List (CLL) These are the only interface in Thread to lock records...
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
uint32_t get_write_set_size() const
Definition: xct.hpp:157
uint32_t ordinal_
Indicates the ordinal among ReadXctAccess/WriteXctAccess of this transaction.
Definition: xct_access.hpp:96
XctId observed_owner_id_
Transaction ID of the record observed as of the access.
Definition: xct_access.hpp:143
void construct(thread::Thread *context, uint32_t read_lock_threshold)
Fill out this retrospetive lock list for the next run of the given transaction.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
bool precommit_xct_verify_readwrite(thread::Thread *context, XctId *max_xct_id)
Phase 2 of precommit_xct() for read-write case.
Represents a record of write-access during a transaction.
Definition: xct_access.hpp:168
const storage::VolatilePagePointer * address_
Address of the volatile pointer.
Definition: xct_access.hpp:52
void hotter(thread::Thread *context) const
Definition: xct_id.cpp:74
std::atomic< Epoch::EpochInteger > requested_global_epoch_
If some thread requested to immediately advance epoch, the requested epoch.
GlobalMemoryAnchors * get_global_memory_anchors()
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
Epoch get_current_grace_epoch() const
Returns the current grace-period epoch (global epoch - 1), the epoch some transaction might be still ...
void deactivate()
Closes the transaction.
Definition: xct.hpp:108
void wait_until_resume_accepting_xct(thread::Thread *context)
storage::VolatilePagePointer observed_
Value of the volatile pointer as of the access.
Definition: xct_access.hpp:55
Epoch get_current_grace_epoch_weak() const
void wait(uint64_t demanded_ticket, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Unconditionally wait for signal.
uint32_t EpochInteger
Unsigned integer representation of epoch.
Definition: epoch.hpp:64
double elapsed_ms() const
Definition: stop_watch.hpp:48
const XctId & get_id() const
Returns the ID of this transaction, but note that it is not issued until commit time! ...
Definition: xct.hpp:151
ErrorCode begin_xct(thread::Thread *context, IsolationLevel isolation_level)
Begins a new transaction on the thread.
Result of track_moved_record().
Definition: xct_id.hpp:1180
ThreadId get_thread_id() const
Definition: thread.cpp:53
void prefetch_cacheline(const void *address)
Prefetch one cacheline to L1 cache.
Definition: cacheline.hpp:49
bool precommit_xct_verify_readonly(thread::Thread *context, Epoch *commit_epoch)
Phase 2 of precommit_xct() for read-only case.
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
Definition: error_code.hpp:109
soc::SharedPolling epoch_chime_wakeup_
Fired to wakeup epoch_chime_thread_.
bool before(const XctId &other) const __attribute__((always_inline))
Returns if this XctId is before other in serialization order, meaning this is either an invalid (unus...
Definition: xct_id.hpp:1074
Represents a user transaction.
Definition: xct.hpp:58
LockListPosition get_last_active_entry() const
storage::PageVersionStatus observed_
Value of the page version as of the access.
Definition: xct_access.hpp:79
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
ErrorCode precommit_xct_lock_batch_track_moved(thread::Thread *context)
Subroutine of precommit_xct_lock to track most of moved records in write-set.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
void set_xct_id(xct::XctId new_xct_id)
Because of the special case of FillerLogType, we must use this method to set xct_id.
ErrorCode precommit_xct_lock(thread::Thread *context, XctId *max_xct_id)
Phase 1 of precommit_xct()
const LockListPosition kLockListPositionInvalid
Definition: xct_id.hpp:149
Represents a record of read-access during a transaction.
Definition: xct_access.hpp:139
Represents a time epoch.
Definition: epoch.hpp:61
bool is_active() const
Returns whether the object is an active transaction.
Definition: xct.hpp:121
XctId xct_id_
the second 64bit: Persistent status part of TID.
Definition: xct_id.hpp:1137
ErrorStack stop_cleaner()
Stops internal eviction thread even before uninitialize() of this object is called.
An entry in CLL and RLL, representing a lock that is taken or will be taken.
uint32_t get_ordinal() const __attribute__((always_inline))
Definition: xct_id.hpp:976
0x002A : foedus::storage::hash::HashDeleteLogType .
Definition: log_type.hpp:123
ErrorCode abort_xct(thread::Thread *context)
An iterator over CurrentLockList to find entries along with sorted write-set.
bool needs_track_moved() const __attribute__((always_inline))
Definition: xct_id.hpp:1144
ReadXctAccess * get_read_set()
Definition: xct.hpp:162
void handle_epoch_chime()
Main routine for epoch_chime_thread_.
Engine * get_engine() const
Definition: thread.cpp:52
Represents a record of special write-access during a transaction without any need for locking...
Definition: xct_access.hpp:228
Epoch * get_in_commit_epoch_address()
Currently we don't have sysxct_release_locks() etc.
Definition: thread.cpp:55
XctId observed_owner_id_
XID value we observed.
Definition: xct_access.hpp:204
const EngineOptions & get_options() const
Definition: engine.cpp:39
bool is_master() const
Returns if this engine object is a master instance.
Definition: engine.cpp:68
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Definition: engine.cpp:53
void clear_status_bits()
Definition: xct_id.hpp:1083
Automatically sets in-commit-epoch with appropriate fence during pre-commit protocol.
void set(Epoch::EpochInteger epoch_int, uint32_t ordinal)
Definition: xct_id.hpp:958
ErrorCode precommit_xct(thread::Thread *context, Epoch *commit_epoch)
Prepares the currently running transaction on the thread for commit.
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
uint64_t acquire_ticket() const
Gives the ticket to.
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
Definition: thread.cpp:78
uint32_t epoch_advance_interval_ms_
Intervals in milliseconds between epoch advancements.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
void wait_for_current_global_epoch(Epoch target_epoch, int64_t wait_microseconds)
ErrorCode begin_xct(thread::Thread *context, IsolationLevel isolation_level)
User transactions related methods.
0 means no-error.
Definition: error_code.hpp:87
ThreadGroupRef * get_group_ref(ThreadGroupId numa_node)
Definition: thread_pool.cpp:66
bool is_moved() const __attribute__((always_inline))
Definition: xct_id.hpp:1142
void store_max(const XctId &other) __attribute__((always_inline))
Kind of std::max(this, other).
Definition: xct_id.hpp:1059
static bool compare(const WriteXctAccess &left, const WriteXctAccess &right) __attribute__((always_inline))
Definition: xct_access.hpp:181
Definitions of IDs in this package and a few related constant values.
log::LogManager * get_log_manager() const
See Log Manager.
Definition: engine.cpp:49
void set_owner_id_resolve_lock_id(const memory::GlobalVolatilePageResolver &resolver, RwLockableXctId *owner_id_address)
Calculate owner_lock_id using the resolver.
Definition: xct_access.cpp:93
uint32_t get_read_set_size() const
Definition: xct.hpp:156
RwLockableXctId * owner_id_address_
Pointer to the accessed record.
Definition: xct_access.hpp:102
ErrorCode wait_for_commit(Epoch commit_epoch, int64_t wait_microseconds)
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
Definition: xct_id.hpp:148
void precommit_xct_sort_access(thread::Thread *context)
Constants and methods related to CPU cacheline and its prefetching.
bool is_read_only() const
Returns if this transaction makes no writes.
Definition: xct.hpp:145
bool precommit_xct_verify_pointer_set(thread::Thread *context)
Returns false if there is any pointer set conflict.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
bool is_initialized() const override
Returns whether the object has been already initialized or not.
char * payload_address_
Pointer to the payload of the record.
Definition: xct_access.hpp:172
LockMode preferred_mode_
Whick lock mode we should take according to RLL.
uint64_t stop()
Take another current time tick.
Definition: stop_watch.cpp:35
taken_mode_: we took a write-lock.
Definition: xct_id.hpp:110
uint32_t get_page_version_set_size() const
Definition: xct.hpp:155
Epoch get_epoch() const __attribute__((always_inline))
Definition: xct_id.hpp:964
ErrorCode abort_xct(thread::Thread *context)
Aborts the currently running transaction on the thread.
LockMode taken_mode_
Whick lock mode we have taken during the current run (of course initially kNoLock) ...
Epoch one_more() const
Definition: epoch.hpp:127
void cll_release_all_locks_after(xct::UniversalLockId address)
Release all locks in CLL of this thread whose addresses are canonically ordered before the parameter...
bool is_deleted() const __attribute__((always_inline))
Definition: xct_id.hpp:1040
storage::StorageId storage_id_
The storage we accessed.
Definition: xct_access.hpp:85
const uint16_t kReadsetPrefetchBatch
std::atomic< bool > new_transaction_paused_
If true, all new requests to begin_xct() will be paused until this becomes false. ...
WriteXctAccess * related_write_
An optional member that points to a write access related to this read.
Definition: xct_access.hpp:153
ErrorCode precommit_xct(thread::Thread *context, Epoch *commit_epoch)
This is the gut of commit protocol.
xct::RetrospectiveLockList * get_retrospective_lock_list()
Definition: xct.hpp:415
void discard_current_xct_log()
Called when the current transaction aborts.
XctManagerControlBlock * control_block_
xct::XctManagerControlBlock * xct_manager_memory_
Tiny memory for xct manager.
Repository of all shared memory in one FOEDUS instance.
std::thread epoch_chime_thread_
This thread keeps advancing the current_global_epoch_.
RwLockableXctId * lock_
Virtual address of the lock.
uint32_t get_pointer_set_size() const
Definition: xct.hpp:154
bool precommit_xct_lock_track_write(thread::Thread *context, WriteXctAccess *entry)
used from precommit_xct_lock() to track moved record
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
bool force_canonical_xlocks_in_precommit_
Whether precommit always releases all locks that violate canonical mode before taking X-locks...
ErrorCode wait_until_durable(Epoch commit_epoch, int64_t wait_microseconds=-1)
Synchronously blocks until the durable global epoch reaches the given commit epoch or the given durat...
Definition: log_manager.cpp:46
void store_max(const Epoch &other)
Kind of std::max(this, other).
Definition: epoch.hpp:165
WriteXctAccess * get_write_set()
Definition: xct.hpp:163
IsolationLevel
Specifies the level of isolation during transaction processing.
Definition: xct_id.hpp:55
bool is_valid() const
Definition: epoch.hpp:96
bool precommit_xct_verify_page_version_set(thread::Thread *context)
Returns false if there is any page version conflict.
uint32_t get_lock_free_read_set_size() const
Definition: xct.hpp:158
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
0x0034 : foedus::storage::masstree::MasstreeDeleteLogType .
Definition: log_type.hpp:128
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
void activate(IsolationLevel isolation_level)
Begins the transaction.
Definition: xct.hpp:79
void handle_epoch_chime_wait_grace_period(Epoch grace_epoch)
Makes sure all worker threads will commit with an epoch larger than grace_epoch.
PageVersionStatus status_
Definition: page.hpp:172
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
const ErrorStack kRetOk
Normal return value for no-error case.
void advance_current_global_epoch()
Requests to advance the current global epoch as soon as possible and blocks until it actually does...
void batch_insert_write_placeholders(const WriteXctAccess *write_set, uint32_t write_set_size)
Create entries for all write-sets in one-shot.
Storage Manager class that provides API to create/open/close/drop key-value stores.
uint64_t get_offset_committed() const
This marks the position upto which transaction logs are committed by the thread.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
uint16_t get_rll_threshold_for_this_xct() const
Definition: xct.hpp:135
xct::TrackMovedRecordResult track_moved_record(StorageId storage_id, xct::RwLockableXctId *old_address, xct::WriteXctAccess *write_set)
Resolves a "moved" record.
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
bool is_being_written() const __attribute__((always_inline))
Definition: xct_id.hpp:1038
LogCode get_type() const __attribute__((always_inline))
Convenience method to cast into LogCode.
const StorageName & get_name(StorageId id)
Returns the name of the given storage ID.
Epoch get_min_in_commit_epoch() const
Returns the oldest in-commit epoch of the threads in this group.
Definition: thread_ref.cpp:132
void set_being_written() __attribute__((always_inline))
Definition: xct_id.hpp:1019
const LockEntry * get_array() const
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
void set_requested_global_epoch(Epoch request)
As there is no transaction in ep-1, initial durable_epoch is 1.
Definition: epoch.hpp:70
std::atomic< Epoch::EpochInteger > current_global_epoch_
The current epoch of the entire engine.
Sorted list of all locks, either read-lock or write-lock, taken in the current run.
bool null_device_
[Experiments] as if we write out to /dev/null.
log::RecordLogType * log_entry_
Pointer to the log entry in private log buffer for this write opereation.
Definition: xct_access.hpp:235
uint64_t get_offset_tail() const
The current cursor to which next log will be written.
bool is_enable_rll_for_this_xct() const
Definition: xct.hpp:123
void invoke_apply_record(void *log_buffer, thread::Thread *context, storage::StorageId storage_id, xct::RwLockableXctId *owner_id_address, char *payload_address)
Invokes the apply logic for a record-wise log type.
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
Definition: error_code.hpp:110
0x0A04 : "XCTION : This thread is not running any transaction." .
Definition: error_code.hpp:199
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
0x0A05 : "XCTION : Aborted a transaction because of a race condition. This is an expected error in hi...
Definition: error_code.hpp:200
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
Represents a record of following a page pointer during a transaction.
Definition: xct_access.hpp:48
ErrorCode precommit_xct_readonly(thread::Thread *context, Epoch *commit_epoch)
precommit_xct() if the transaction is read-only
std::atomic< bool > epoch_chime_terminate_requested_
Protected by the mutex in epoch_chime_wakeup_.
A high-resolution stop watch.
Definition: stop_watch.hpp:30
ErrorCode wait_for_commit(Epoch commit_epoch, int64_t wait_microseconds=-1)
Synchronously blocks until the durable global epoch reaches the given commit epoch or the given durat...
const PageVersionAccess * get_page_version_set() const
Definition: xct.hpp:161
LockFreeWriteXctAccess * get_lock_free_write_set()
Definition: xct.hpp:165
uint32_t get_lock_free_write_set_size() const
Definition: xct.hpp:159
void pause_accepting_xct()
Pause all begin_xct until you call resume_accepting_xct()
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
bool is_next_layer() const __attribute__((always_inline))
Definition: xct_id.hpp:1143
Represents a record of reading a page during a transaction.
Definition: xct_access.hpp:72
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
Definition: engine.cpp:52
soc::SharedPolling current_global_epoch_advanced_
Fired (broadcast) whenever current_global_epoch_ is advanced.
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
bool is_keylocked() const __attribute__((always_inline))
Definition: xct_id.hpp:1140
The pool of pre-allocated threads in the engine to execute transactions.
uint64_t elapsed_ns() const
Definition: stop_watch.hpp:42
LockListPosition get_last_locked_entry() const
void signal()
Signal it to let waiters exit.
EpochInteger value() const
Returns the raw integer representation.
Definition: epoch.hpp:102
bool precommit_xct_verify_track_read(thread::Thread *context, ReadXctAccess *entry)
used from verification methods to track moved record
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).
storage::StorageId storage_id_
The storage we accessed.
Definition: xct_access.hpp:232
const uint64_t kDefaultPollingSpins
Default value of polling_spins.
LockFreeReadXctAccess * get_lock_free_read_set()
Definition: xct.hpp:164