libfoedus-core
FOEDUS Core Library
thread_pimpl.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <pthread.h>
21 #include <sched.h>
22 #include <glog/logging.h>
23 
24 #include <atomic>
25 #include <future>
26 #include <mutex>
27 #include <thread>
28 
29 #include "foedus/assert_nd.hpp"
30 #include "foedus/engine.hpp"
39 #include "foedus/proc/proc_id.hpp"
44 #include "foedus/thread/thread.hpp"
49 #include "foedus/xct/xct_id.hpp"
52 
53 namespace foedus {
54 namespace thread {
56  Engine* engine,
57  Thread* holder,
58  ThreadId id,
59  ThreadGlobalOrdinal global_ordinal)
60  : engine_(engine),
61  holder_(holder),
62  id_(id),
63  numa_node_(decompose_numa_node(id)),
64  global_ordinal_(global_ordinal),
65  core_memory_(nullptr),
66  node_memory_(nullptr),
67  snapshot_cache_hashtable_(nullptr),
68  snapshot_page_pool_(nullptr),
69  log_buffer_(engine, id),
70  current_xct_(engine, holder, id),
71  snapshot_file_set_(engine),
72  control_block_(nullptr),
73  task_input_memory_(nullptr),
74  task_output_memory_(nullptr),
75  mcs_ww_blocks_(nullptr),
76  mcs_rw_simple_blocks_(nullptr),
77  mcs_rw_extended_blocks_(nullptr),
78  canonical_address_(nullptr) {
79 }
80 
83 
86  control_block_ = anchors->thread_memory_;
88  task_input_memory_ = anchors->task_input_memory_;
89  task_output_memory_ = anchors->task_output_memory_;
90  mcs_ww_blocks_ = anchors->mcs_ww_lock_memories_;
91  mcs_rw_simple_blocks_ = anchors->mcs_rw_simple_lock_memories_;
92  mcs_rw_extended_blocks_ = anchors->mcs_rw_extended_lock_memories_;
93  mcs_rw_async_mappings_ = anchors->mcs_rw_async_mappings_memories_;
94 
103  } else {
104  snapshot_cache_hashtable_ = nullptr;
105  }
108  core_memory_,
116 
117  raw_thread_set_ = false;
118  raw_thread_ = std::move(std::thread(&ThreadPimpl::handle_tasks, this));
119  raw_thread_set_ = true;
120  return kRetOk;
121 }
123  ErrorStackBatch batch;
124  {
125  {
128  }
129  LOG(INFO) << "Thread-" << id_ << " requested to terminate";
130  if (raw_thread_.joinable()) {
131  raw_thread_.join();
132  }
134  }
135  {
136  // release retired volatile pages. we do this in thread module rather than in memory module
137  // because this has to happen before NumaNodeMemory of any node is uninitialized.
138  for (uint16_t node = 0; node < engine_->get_soc_count(); ++node) {
141  memory::PagePool* volatile_pool
143  if (!chunk->empty()) {
144  volatile_pool->release(chunk->size(), chunk);
145  }
146  ASSERT_ND(chunk->empty());
147  }
148  }
151  core_memory_ = nullptr;
152  node_memory_ = nullptr;
153  snapshot_cache_hashtable_ = nullptr;
155  return SUMMARIZE_ERROR_BATCH(batch);
156 }
157 
161 }
162 
164  int numa_node = static_cast<int>(decompose_numa_node(id_));
165  LOG(INFO) << "Thread-" << id_ << " started running on NUMA node: " << numa_node
166  << " control_block address=" << control_block_;
167  NumaThreadScope scope(numa_node);
169  ASSERT_ND(control_block_->status_ == kNotInitialized);
170  control_block_->status_ = kWaitingForTask;
171  while (!is_stop_requested()) {
173  {
174  uint64_t demand = control_block_->wakeup_cond_.acquire_ticket();
175  if (is_stop_requested()) {
176  break;
177  }
178  // these two status are "not urgent".
179  if (control_block_->status_ == kWaitingForTask
180  || control_block_->status_ == kWaitingForClientRelease) {
181  VLOG(0) << "Thread-" << id_ << " sleeping...";
182  control_block_->wakeup_cond_.timedwait(demand, 100000ULL, 1U << 16, 1U << 13);
183  }
184  }
185  VLOG(0) << "Thread-" << id_ << " woke up. status=" << control_block_->status_;
186  if (control_block_->status_ == kWaitingForExecution) {
187  control_block_->output_len_ = 0;
188  control_block_->status_ = kRunningTask;
189 
190  // Reset the default value of enable_rll_for_this_xct etc to system-wide setting
191  // for every impersonation.
198 
199  const proc::ProcName& proc_name = control_block_->proc_name_;
200  VLOG(0) << "Thread-" << id_ << " retrieved a task: " << proc_name;
201  proc::Proc proc = nullptr;
202  ErrorStack result = engine_->get_proc_manager()->get_proc(proc_name, &proc);
203  if (result.is_error()) {
204  // control_block_->proc_result_
205  LOG(ERROR) << "Thread-" << id_ << " couldn't find procedure: " << proc_name;
206  } else {
207  uint32_t output_used = 0;
208  proc::ProcArguments args = {
209  engine_,
210  holder_,
212  control_block_->input_len_,
215  &output_used,
216  };
217  result = proc(args);
218  VLOG(0) << "Thread-" << id_ << " run(task) returned. result =" << result
219  << ", output_used=" << output_used;
220  control_block_->output_len_ = output_used;
221  }
222  if (result.is_error()) {
223  control_block_->proc_result_.from_error_stack(result);
224  } else {
225  control_block_->proc_result_.clear();
226  }
227  control_block_->status_ = kWaitingForClientRelease;
228  {
229  // Wakeup the client if it's waiting.
230  control_block_->task_complete_cond_.signal();
231  }
232  VLOG(0) << "Thread-" << id_ << " finished a task. result =" << result;
233  }
234  }
236  control_block_->status_ = kTerminated;
237  LOG(INFO) << "Thread-" << id_ << " exits";
238 }
240  // this code totally assumes pthread. maybe ifdef to handle Windows.. later!
241  SPINLOCK_WHILE(raw_thread_set_ == false) {
242  // as the copy to raw_thread_ might happen after the launched thread getting here,
243  // we check it and spin. This is mainly to make valgrind happy.
244  continue;
245  }
246  pthread_t handle = static_cast<pthread_t>(raw_thread_.native_handle());
247  int policy;
248  sched_param param;
249  int ret = ::pthread_getschedparam(handle, &policy, &param);
250  if (ret) {
251  LOG(FATAL) << "WTF. pthread_getschedparam() failed: error=" << assorted::os_error();
252  }
253  const ThreadOptions& opt = engine_->get_options().thread_;
254  // output the following logs just once.
255  if (id_ == 0) {
256  LOG(INFO) << "The default thread policy=" << policy << ", priority=" << param.__sched_priority;
257  if (opt.overwrite_thread_schedule_) {
258  LOG(INFO) << "Overwriting thread policy=" << opt.thread_policy_
259  << ", priority=" << opt.thread_priority_;
260  }
261  }
262  if (opt.overwrite_thread_schedule_) {
263  policy = opt.thread_policy_;
264  param.__sched_priority = opt.thread_priority_;
265  int priority_max = ::sched_get_priority_max(policy);
266  int priority_min = ::sched_get_priority_min(policy);
267  if (opt.thread_priority_ > priority_max) {
268  LOG(WARNING) << "Thread priority too large. using max value: "
269  << opt.thread_priority_ << "->" << priority_max;
270  param.__sched_priority = priority_max;
271  }
272  if (opt.thread_priority_ < priority_min) {
273  LOG(WARNING) << "Thread priority too small. using min value: "
274  << opt.thread_priority_ << "->" << priority_min;
275  param.__sched_priority = priority_min;
276  }
277  int ret2 = ::pthread_setschedparam(handle, policy, &param);
278  if (ret2 == EPERM) {
279  // this is a quite common mis-configuratrion, so let's output a friendly error message.
280  // also, not fatal. keep running, as this only affects performance.
281  LOG(WARNING) << "========= ATTENTION: Thread-scheduling Permission Error!!!! ==========\n"
282  " pthread_setschedparam() failed due to permission error. This means you have\n"
283  " not set appropriate rtprio to limits.conf. You cannot set priority higher than what\n"
284  " OS allows. Configure limits.conf (eg. 'kimurhid - rtprio 99') or modify ThreadOptions.\n"
285  "============================= ATTENTION ======================";
286  } else if (ret2) {
287  LOG(FATAL) << "WTF pthread_setschedparam() failed: error=" << assorted::os_error();
288  }
289  }
290 }
291 
293  storage::DualPagePointer* pointer,
294  storage::Page** installed_page) {
295  ASSERT_ND(pointer->snapshot_pointer_ != 0);
296 
297  // copy from snapshot version
298  storage::Page* snapshot_page;
301  const auto offset = volatile_pointer.get_offset();
302  if (UNLIKELY(volatile_pointer.is_null())) {
304  }
307  std::memcpy(page, snapshot_page, storage::kPageSize);
308  // We copied from a snapshot page, so the snapshot flag is on.
309  ASSERT_ND(page->get_header().snapshot_);
310  page->get_header().snapshot_ = false; // now it's volatile
311  page->get_header().page_id_ = volatile_pointer.word; // and correct page ID
312 
313  *installed_page = place_a_new_volatile_page(offset, pointer);
314  return kErrorCodeOk;
315 }
316 
318  memory::PagePoolOffset new_offset,
319  storage::DualPagePointer* pointer) {
320  while (true) {
321  storage::VolatilePagePointer cur_pointer = pointer->volatile_pointer_;
322  storage::VolatilePagePointer new_pointer;
323  new_pointer.set(numa_node_, new_offset);
324  // atomically install it.
325  if (cur_pointer.is_null() &&
326  assorted::raw_atomic_compare_exchange_strong<uint64_t>(
327  &(pointer->volatile_pointer_.word),
328  &(cur_pointer.word),
329  new_pointer.word)) {
330  // successfully installed
332  break;
333  } else {
334  if (!cur_pointer.is_null()) {
335  // someone else has installed it!
336  VLOG(0) << "Interesting. Lost race to install a volatile page. ver-b. Thread-" << id_
337  << ", local offset=" << new_offset << " winning=" << cur_pointer;
339  storage::Page* placed_page = global_volatile_page_resolver_.resolve_offset(cur_pointer);
340  ASSERT_ND(placed_page->get_header().snapshot_ == false);
341  return placed_page;
342  } else {
343  // This is probably a bug, but we might only change mod count for some reason.
344  LOG(WARNING) << "Very interesting. Lost race but volatile page not installed. Thread-"
345  << id_ << ", local offset=" << new_offset;
346  continue;
347  }
348  }
349  }
350 }
351 
352 // placed here to allow inlining
354  storage::VolatilePageInit page_initializer,
355  bool tolerate_null_pointer,
356  bool will_modify,
357  bool take_ptr_set_snapshot,
358  storage::DualPagePointer* pointer,
359  storage::Page** page,
360  const storage::Page* parent,
361  uint16_t index_in_parent) {
362  return pimpl_->follow_page_pointer(
363  page_initializer,
364  tolerate_null_pointer,
365  will_modify,
366  take_ptr_set_snapshot,
367  pointer,
368  page,
369  parent,
370  index_in_parent);
371 }
372 
374  uint16_t batch_size,
375  storage::VolatilePageInit page_initializer,
376  bool tolerate_null_pointer,
377  bool take_ptr_set_snapshot,
378  storage::DualPagePointer** pointers,
379  storage::Page** parents,
380  const uint16_t* index_in_parents,
381  bool* followed_snapshots,
382  storage::Page** out) {
384  batch_size,
385  page_initializer,
386  tolerate_null_pointer,
387  take_ptr_set_snapshot,
388  pointers,
389  parents,
390  index_in_parents,
391  followed_snapshots,
392  out);
393 }
394 
396  uint16_t batch_size,
397  storage::VolatilePageInit page_initializer,
398  storage::DualPagePointer** pointers,
399  storage::Page** parents,
400  const uint16_t* index_in_parents,
401  storage::Page** out) {
403  batch_size,
404  page_initializer,
405  pointers,
406  parents,
407  index_in_parents,
408  out);
409 }
410 
412  storage::VolatilePageInit page_initializer,
413  bool tolerate_null_pointer,
414  bool will_modify,
415  bool take_ptr_set_snapshot,
416  storage::DualPagePointer* pointer,
417  storage::Page** page,
418  const storage::Page* parent,
419  uint16_t index_in_parent) {
420  ASSERT_ND(!tolerate_null_pointer || !will_modify);
421 
422  storage::VolatilePagePointer volatile_pointer = pointer->volatile_pointer_;
423  bool followed_snapshot = false;
424  if (pointer->snapshot_pointer_ == 0) {
425  if (volatile_pointer.is_null()) {
426  // both null, so the page is not created yet.
427  if (tolerate_null_pointer) {
428  *page = nullptr;
429  } else {
430  // place an empty new page
431  ASSERT_ND(page_initializer);
432  // we must not install a new volatile page in snapshot page. We must not hit this case.
433  ASSERT_ND(!parent->get_header().snapshot_);
435  if (UNLIKELY(offset == 0)) {
437  }
440  storage::VolatilePagePointer new_page_id;
441  new_page_id.set(numa_node_, offset);
443  holder_,
444  new_page_id,
445  new_page,
446  parent,
447  index_in_parent
448  };
449  page_initializer(args);
450  storage::assert_valid_volatile_page(new_page, offset);
451  ASSERT_ND(new_page->get_header().snapshot_ == false);
452 
453  *page = place_a_new_volatile_page(offset, pointer);
454  }
455  } else {
456  // then we have to follow volatile page anyway
457  *page = global_volatile_page_resolver_.resolve_offset(volatile_pointer);
458  }
459  } else {
460  // if there is a snapshot page, we have a few more choices.
461  if (!volatile_pointer.is_null()) {
462  // we have a volatile page, which is guaranteed to be latest
463  *page = global_volatile_page_resolver_.resolve_offset(volatile_pointer);
464  } else if (will_modify) {
465  // we need a volatile page. so construct it from snapshot
467  } else {
468  // otherwise just use snapshot
470  followed_snapshot = true;
471  }
472  }
473  ASSERT_ND((*page) == nullptr || (followed_snapshot == (*page)->get_header().snapshot_));
474 
475  // if we follow a snapshot pointer, remember pointer set
477  if ((*page == nullptr || followed_snapshot) && take_ptr_set_snapshot) {
478  current_xct_.add_to_pointer_set(&pointer->volatile_pointer_, volatile_pointer);
479  }
480  }
481  return kErrorCodeOk;
482 }
483 
485  uint16_t batch_size,
486  storage::VolatilePageInit page_initializer,
487  bool tolerate_null_pointer,
488  bool take_ptr_set_snapshot,
489  storage::DualPagePointer** pointers,
490  storage::Page** parents,
491  const uint16_t* index_in_parents,
492  bool* followed_snapshots,
493  storage::Page** out) {
494  ASSERT_ND(tolerate_null_pointer || page_initializer);
495  if (batch_size == 0) {
496  return kErrorCodeOk;
497  } else if (UNLIKELY(batch_size > Thread::kMaxFindPagesBatch)) {
499  }
500 
501  // this one uses a batched find method for following snapshot pages.
502  // some of them might follow volatile pages, so we do it only when at least one snapshot ptr.
503  bool has_some_snapshot = false;
504  const bool needs_ptr_set
505  = take_ptr_set_snapshot && current_xct_.get_isolation_level() == xct::kSerializable;
506 
507  // REMINDER: Remember that it might be parents == out. We thus use tmp_out.
509 #ifndef NDEBUG
510  // fill with garbage for easier debugging
511  std::memset(tmp_out, 0xDA, sizeof(tmp_out));
512 #endif // NDEBUG
513 
514  // collect snapshot page IDs.
516  for (uint16_t b = 0; b < batch_size; ++b) {
517  snapshot_page_ids[b] = 0;
518  storage::DualPagePointer* pointer = pointers[b];
519  if (pointer == nullptr) {
520  continue;
521  }
522  // followed_snapshots is both input and output.
523  // as input, it should indicate whether the parent is snapshot or not
524  ASSERT_ND(parents[b]->get_header().snapshot_ == followed_snapshots[b]);
525  if (pointer->snapshot_pointer_ != 0 && pointer->volatile_pointer_.is_null()) {
526  has_some_snapshot = true;
527  snapshot_page_ids[b] = pointer->snapshot_pointer_;
528  }
529  }
530 
531  // follow them in a batch. output to tmp_out.
532  if (has_some_snapshot) {
533  CHECK_ERROR_CODE(find_or_read_snapshot_pages_batch(batch_size, snapshot_page_ids, tmp_out));
534  }
535 
536  // handle cases we have to volatile pages. also we might have to create a new page.
537  for (uint16_t b = 0; b < batch_size; ++b) {
538  storage::DualPagePointer* pointer = pointers[b];
539  if (has_some_snapshot) {
540  if (pointer == nullptr) {
541  out[b] = nullptr;
542  continue;
543  } else if (tmp_out[b]) {
544  // if we follow a snapshot pointer _from volatile page_, remember pointer set
545  if (needs_ptr_set && !followed_snapshots[b]) {
547  }
548  followed_snapshots[b] = true;
549  out[b] = tmp_out[b];
550  continue;
551  }
552  ASSERT_ND(tmp_out[b] == nullptr);
553  }
554 
555  // we didn't follow snapshot page. we must follow volatile page, or null.
556  followed_snapshots[b] = false;
557  ASSERT_ND(!parents[b]->get_header().snapshot_);
558  if (pointer->snapshot_pointer_ == 0) {
559  if (pointer->volatile_pointer_.is_null()) {
560  // both null, so the page is not created yet.
561  if (tolerate_null_pointer) {
562  out[b] = nullptr;
563  } else {
565  if (UNLIKELY(offset == 0)) {
567  }
569  storage::VolatilePagePointer new_page_id;
570  new_page_id.set(numa_node_, offset);
572  holder_,
573  new_page_id,
574  new_page,
575  parents[b],
576  index_in_parents[b]
577  };
578  page_initializer(args);
579  storage::assert_valid_volatile_page(new_page, offset);
580  ASSERT_ND(new_page->get_header().snapshot_ == false);
581 
582  out[b] = place_a_new_volatile_page(offset, pointer);
583  }
584  } else {
586  }
587  } else {
588  ASSERT_ND(!pointer->volatile_pointer_.is_null());
590  }
591  }
592  return kErrorCodeOk;
593 }
594 
596  uint16_t batch_size,
597  storage::VolatilePageInit page_initializer,
598  storage::DualPagePointer** pointers,
599  storage::Page** parents,
600  const uint16_t* index_in_parents,
601  storage::Page** out) {
602  // REMINDER: Remember that it might be parents == out. It's not an issue in this function, tho.
603  // this method is not quite batched as it doesn't need to be.
604  // still, less branches because we can assume all of them need a writable volatile page.
605  for (uint16_t b = 0; b < batch_size; ++b) {
606  storage::DualPagePointer* pointer = pointers[b];
607  if (pointer == nullptr) {
608  out[b] = nullptr;
609  continue;
610  }
611  ASSERT_ND(!parents[b]->get_header().snapshot_);
612  storage::Page** page = out + b;
613  storage::VolatilePagePointer volatile_pointer = pointer->volatile_pointer_;
614  if (!volatile_pointer.is_null()) {
615  *page = global_volatile_page_resolver_.resolve_offset(volatile_pointer);
616  } else if (pointer->snapshot_pointer_ == 0) {
617  // we need a volatile page. so construct it from snapshot
619  } else {
620  ASSERT_ND(page_initializer);
621  // we must not install a new volatile page in snapshot page. We must not hit this case.
623  if (UNLIKELY(offset == 0)) {
625  }
627  storage::VolatilePagePointer new_page_id;
628  new_page_id.set(numa_node_, offset);
630  holder_,
631  new_page_id,
632  new_page,
633  parents[b],
634  index_in_parents[b]
635  };
636  page_initializer(args);
637  storage::assert_valid_volatile_page(new_page, offset);
638  ASSERT_ND(new_page->get_header().snapshot_ == false);
639 
640  *page = place_a_new_volatile_page(offset, pointer);
641  }
642  ASSERT_ND(out[b] != nullptr);
643  }
644  return kErrorCodeOk;
645 }
646 
649  storage::Page** out) {
653  // the "find" is very efficient and wait-free, but instead it might have false positive/nagative
654  // in which case we should just install a new page. No worry about duplicate thanks to the
655  // immutability of snapshot pages. it just wastes a bit of CPU and memory.
656  if (offset == 0 || snapshot_page_pool_->get_base()[offset].get_header().page_id_ != page_id) {
657  if (offset != 0) {
658  DVLOG(0) << "Interesting, this race is rare, but possible. offset=" << offset;
659  }
660  CHECK_ERROR_CODE(on_snapshot_cache_miss(page_id, &offset));
661  ASSERT_ND(offset != 0);
664  } else {
666  }
667  ASSERT_ND(offset != 0);
668  *out = snapshot_page_pool_->get_base() + offset;
669  } else {
671  // Snapshot is disabled. So far this happens only in performance experiments.
672  // We use local work memory in this case.
675  reinterpret_cast<void**>(out),
677  return read_a_snapshot_page(page_id, *out);
678  }
679  return kErrorCodeOk;
680 }
681 
682 static_assert(
683  static_cast<int>(Thread::kMaxFindPagesBatch)
684  <= static_cast<int>(cache::CacheHashtable::kMaxFindBatchSize),
685  "Booo");
686 
688  uint16_t batch_size,
689  const storage::SnapshotPagePointer* page_ids,
690  storage::Page** out) {
691  ASSERT_ND(batch_size <= Thread::kMaxFindPagesBatch);
692  if (batch_size == 0) {
693  return kErrorCodeOk;
694  } else if (UNLIKELY(batch_size > Thread::kMaxFindPagesBatch)) {
696  }
697 
701  CHECK_ERROR_CODE(snapshot_cache_hashtable_->find_batch(batch_size, page_ids, offsets));
702  for (uint16_t b = 0; b < batch_size; ++b) {
703  memory::PagePoolOffset offset = offsets[b];
704  storage::SnapshotPagePointer page_id = page_ids[b];
705  if (page_id == 0) {
706  out[b] = nullptr;
707  continue;
708  } else if (b > 0 && page_ids[b - 1] == page_id) {
709  ASSERT_ND(offsets[b - 1] == offset);
710  out[b] = out[b - 1];
711  continue;
712  }
713  if (offset == 0 || snapshot_page_pool_->get_base()[offset].get_header().page_id_ != page_id) {
714  if (offset != 0) {
715  DVLOG(0) << "Interesting, this race is rare, but possible. offset=" << offset;
716  }
717  CHECK_ERROR_CODE(on_snapshot_cache_miss(page_id, &offset));
718  ASSERT_ND(offset != 0);
721  } else {
723  }
724  ASSERT_ND(offset != 0);
725  out[b] = snapshot_page_pool_->get_base() + offset;
726  }
727  } else {
729  for (uint16_t b = 0; b < batch_size; ++b) {
732  reinterpret_cast<void**>(out + b),
734  CHECK_ERROR_CODE(read_a_snapshot_page(page_ids[b], out[b]));
735  }
736  }
737  return kErrorCodeOk;
738 }
739 
740 
743  memory::PagePoolOffset* pool_offset) {
744  // grab a buffer page to read into.
746  if (offset == 0) {
747  // TASK(Hideaki) First, we have to make sure this doesn't happen often (cleaner's work).
748  // Second, when this happens, we have to do eviction now, but probably after aborting the xct.
749  LOG(ERROR) << "Could not grab free snapshot page while cache miss. thread=" << *holder_
750  << ", page_id=" << assorted::Hex(page_id);
752  }
753 
754  storage::Page* new_page = snapshot_page_pool_->get_base() + offset;
755  ErrorCode read_result = read_a_snapshot_page(page_id, new_page);
756  if (read_result != kErrorCodeOk) {
757  LOG(ERROR) << "Failed to read a snapshot page. thread=" << *holder_
758  << ", page_id=" << assorted::Hex(page_id);
760  return read_result;
761  }
762 
763  *pool_offset = offset;
764  return kErrorCodeOk;
765 }
766 
768  auto* pool_pimpl = engine_->get_thread_pool()->get_pimpl();
769  return pool_pimpl->get_thread_ref(id);
770 }
771 
779  uint16_t node = ptr.get_numa_node();
781  Epoch safe_epoch = current_epoch.one_more().one_more();
783  if (chunk->full()) {
784  flush_retired_volatile_page(node, current_epoch, chunk);
785  }
786  chunk->push_back(ptr.get_offset(), safe_epoch);
787 }
788 
790  uint16_t node,
791  Epoch current_epoch,
793  if (chunk->size() == 0) {
794  return;
795  }
796  uint32_t safe_count = chunk->get_safe_offset_count(current_epoch);
797  while (safe_count < chunk->size() / 10U) {
798  LOG(WARNING) << "Thread-" << id_ << " can return only "
799  << safe_count << " out of " << chunk->size()
800  << " retired pages to node-" << node << " in epoch=" << current_epoch
801  << ". This means the thread received so many retired pages in a short time period."
802  << " Will adavance an epoch to safely return the retired pages."
803  << " This should be a rare event.";
805  current_epoch = engine_->get_xct_manager()->get_current_global_epoch();
806  LOG(INFO) << "okay, advanced epoch. now we should be able to return more pages";
807  safe_count = chunk->get_safe_offset_count(current_epoch);
808  }
809 
810  VLOG(0) << "Thread-" << id_ << " batch-returning retired volatile pages to node-" << node
811  << " safe_count/count=" << safe_count << "/" << chunk->size() << ". epoch=" << current_epoch;
812  memory::PagePool* volatile_pool
814  volatile_pool->release(safe_count, chunk);
815  ASSERT_ND(!chunk->full());
816 }
817 
820  return page->get_header().page_version_.is_retired();
821 }
822 
830 
834  return pimpl_->mcs_rw_simple_blocks_;
835 }
837  return pimpl_->mcs_rw_extended_blocks_;
838 }
839 
840 // Put Thread methods here to allow inlining.
842  pimpl_->cll_release_all_locks();
843 }
845  pimpl_->cll_release_all_locks_after(address);
846 }
848  pimpl_->cll_giveup_all_locks_after(address);
849 }
851  return pimpl_->cll_try_or_acquire_single_lock(pos);
852 }
854  return pimpl_->cll_try_or_acquire_multiple_locks(upto_pos);
855 }
856 
859 inline void assert_mcs_aligned(const void* address) {
860  ASSERT_ND(address);
861  ASSERT_ND(reinterpret_cast<uintptr_t>(address) % 4 == 0);
862 }
863 
864 template<typename RW_BLOCK>
866  ThreadPimplMcsAdaptor< RW_BLOCK > adaptor(pimpl);
867  xct::McsImpl< ThreadPimplMcsAdaptor< RW_BLOCK > , RW_BLOCK> impl(adaptor);
868  return impl;
869 }
870 
875 
878  if (is_simple_mcs_rw()) {
879  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
880  cll->release_all_after(address, &impl);
881  } else {
882  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
883  cll->release_all_after(address, &impl);
884  }
885 }
886 
889  if (is_simple_mcs_rw()) {
890  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
891  cll->giveup_all_after(address, &impl);
892  } else {
893  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
894  cll->giveup_all_after(address, &impl);
895  }
896 }
897 
900  if (is_simple_mcs_rw()) {
901  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
902  return cll->try_or_acquire_single_lock(pos, &impl);
903  } else {
904  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
905  return cll->try_or_acquire_single_lock(pos, &impl);
906  }
907 }
908 
911  if (is_simple_mcs_rw()) {
912  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
913  return cll->try_or_acquire_multiple_locks(upto_pos, &impl);
914  } else {
915  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
916  return cll->try_or_acquire_multiple_locks(upto_pos, &impl);
917  }
918 }
921  if (is_simple_mcs_rw()) {
922  auto impl(get_mcs_impl<xct::McsRwSimpleBlock>(this));
923  return cll->release_all_locks(&impl);
924  } else {
925  auto impl(get_mcs_impl<xct::McsRwExtendedBlock>(this));
926  return cll->release_all_locks(&impl);
927  }
928 }
929 
932  return cll->get_max_locked_id();
933 }
934 
935 
938  void operator()() const {
940  }
942 };
943 
949 
952 ErrorCode Thread::run_nested_sysxct(xct::SysxctFunctor* functor, uint32_t max_retries) {
953  return pimpl_->run_nested_sysxct(functor, max_retries);
954 }
956  xct::SysxctWorkspace* sysxct_workspace,
958  xct::RwLockableXctId* lock) {
959  return pimpl_->sysxct_record_lock(sysxct_workspace, page_id, lock);
960 }
962  xct::SysxctWorkspace* sysxct_workspace,
964  uint32_t lock_count,
965  xct::RwLockableXctId** locks) {
966  return pimpl_->sysxct_batch_record_locks(sysxct_workspace, page_id, lock_count, locks);
967 }
969  return pimpl_->sysxct_page_lock(sysxct_workspace, page);
970 }
972  xct::SysxctWorkspace* sysxct_workspace,
973  uint32_t lock_count,
974  storage::Page** pages) {
975  return pimpl_->sysxct_batch_page_locks(sysxct_workspace, lock_count, pages);
976 }
977 
981  xct::SysxctFunctor* functor,
982  uint32_t max_retries) {
984  xct::UniversalLockId enclosing_max_lock_id = cll_get_max_locked_id();
985  ThreadPimplCllReleaseAllFunctor release_functor(this);
986  if (is_simple_mcs_rw()) {
989  functor,
990  adaptor,
991  max_retries,
992  workspace,
993  enclosing_max_lock_id,
994  release_functor);
995  } else {
998  functor,
999  adaptor,
1000  max_retries,
1001  workspace,
1002  enclosing_max_lock_id,
1003  release_functor);
1004  }
1005 }
1006 
1008  xct::SysxctWorkspace* sysxct_workspace,
1010  xct::RwLockableXctId* lock) {
1011  ASSERT_ND(sysxct_workspace->running_sysxct_);
1012  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1013  if (is_simple_mcs_rw()) {
1015  return sysxct_lock_list.request_record_lock(adaptor, page_id, lock);
1016  } else {
1018  return sysxct_lock_list.request_record_lock(adaptor, page_id, lock);
1019  }
1020 }
1022  xct::SysxctWorkspace* sysxct_workspace,
1024  uint32_t lock_count,
1025  xct::RwLockableXctId** locks) {
1026  ASSERT_ND(sysxct_workspace->running_sysxct_);
1027  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1028  if (is_simple_mcs_rw()) {
1030  return sysxct_lock_list.batch_request_record_locks(adaptor, page_id, lock_count, locks);
1031  } else {
1033  return sysxct_lock_list.batch_request_record_locks(adaptor, page_id, lock_count, locks);
1034  }
1035 }
1037  xct::SysxctWorkspace* sysxct_workspace,
1038  storage::Page* page) {
1039  ASSERT_ND(sysxct_workspace->running_sysxct_);
1040  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1041  if (is_simple_mcs_rw()) {
1043  return sysxct_lock_list.request_page_lock(adaptor, page);
1044  } else {
1046  return sysxct_lock_list.request_page_lock(adaptor, page);
1047  }
1048 }
1050  xct::SysxctWorkspace* sysxct_workspace,
1051  uint32_t lock_count,
1052  storage::Page** pages) {
1053  ASSERT_ND(sysxct_workspace->running_sysxct_);
1054  auto& sysxct_lock_list = sysxct_workspace->lock_list_;
1055  if (is_simple_mcs_rw()) {
1057  return sysxct_lock_list.batch_request_page_locks(adaptor, lock_count, pages);
1058  } else {
1060  return sysxct_lock_list.batch_request_page_locks(adaptor, lock_count, pages);
1061  }
1062 }
1063 
1064 static_assert(
1066  "ThreadControlBlock is too large.");
1067 } // namespace thread
1068 } // namespace foedus
void set_default_rll_for_this_xct(bool value)
Definition: xct.hpp:126
Set of options about threads and thread-groups.
void initialize(ThreadId my_thread_id)
ThreadPoolPimpl * get_pimpl() const
Returns the pimpl of this object.
void collect_retired_volatile_page(storage::VolatilePagePointer ptr)
Keeps the specified volatile page as retired as of the current epoch.
void assert_mcs_aligned(const void *address)
ThreadPimpl MCS implementations.
ErrorCode sysxct_page_lock(xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
storage::Page * resolve_offset(uint8_t numa_node, PagePoolOffset offset) const __attribute__((always_inline))
Resolves offset plus NUMA node ID to storage::Page*.
ErrorCode run_nested_sysxct_impl(SysxctFunctor *functor, MCS_ADAPTOR mcs_adaptor, uint32_t max_retries, SysxctWorkspace *workspace, UniversalLockId enclosing_max_lock_id, ENCLOSURE_RELEASE_ALL_LOCKS_FUNCTOR enclosure_release_all_locks_functor)
Runs a system transaction nested in a user transaction.
ErrorCode install_a_volatile_page(storage::DualPagePointer *pointer, storage::Page **installed_page)
Installs a volatile page to the given dual pointer as a copy of the snapshot page.
0x0002 : "GENERAL: Invalid parameter given" .
Definition: error_code.hpp:106
Implements McsAdaptorConcept over ThreadPimpl.
Definition: fwd.hpp:38
void release_free_volatile_page(PagePoolOffset offset)
Returns one free volatile page to local page pool.
Represents a pointer to another page (usually a child page).
Definition: storage_id.hpp:271
cache::SnapshotFileSet snapshot_file_set_
Each threads maintains a private set of snapshot file descriptors.
NumaCoreMemory * get_core_memory(foedus::thread::ThreadId id) const
xct::CurrentLockList * get_current_lock_list()
Definition: xct.hpp:413
Epoch get_current_global_epoch_weak() const
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
ErrorStack uninitialize_once() override final
ErrorCode follow_page_pointers_for_write_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
Idle state, receiving a new task.
Definition: thread_id.hpp:207
void set_default_hot_threshold_for_this_xct(uint16_t value)
Definition: xct.hpp:132
uint32_t mcs_block_current_
How many MCS blocks we allocated in this thread's current xct.
void set_thread_schedule()
initializes the thread's policy/priority
ErrorCode cll_try_or_acquire_multiple_locks(xct::LockListPosition upto_pos)
ErrorCode sysxct_batch_page_locks(xct::SysxctWorkspace *sysxct_workspace, uint32_t lock_count, storage::Page **pages)
Takes a bunch of page locks for a sysxct running under this thread.
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
Definition: page_pool.hpp:173
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
Methods related to Current Lock List (CLL) These are the only interface in Thread to lock records...
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
SysxctLockList lock_list_
Lock list for the system transaction.
bool is_retired() const __attribute__((always_inline))
Definition: page.hpp:140
ErrorCode install(storage::SnapshotPagePointer page_id, ContentId content)
Called when a cached page is not found.
bool enable_retrospective_lock_list_
Whether to use Retrospective Lock List (RLL) after aborts.
ThreadRef get_thread_ref(ThreadId id) __attribute__((always_inline))
For better performance, but for some reason this method causes an issue in MCS lock.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
ErrorCode sysxct_batch_record_locks(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, uint32_t lock_count, xct::RwLockableXctId **locks)
Takes a bunch of locks in the same page for a sysxct running under this thread.
ErrorCode sysxct_record_lock(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
Takes a lock for a sysxct running under this thread.
void release_all_after(UniversalLockId address, MCS_RW_IMPL *mcs_rw_impl)
Release all locks in CLL whose addresses are canonically ordered before the parameter.
memory::NumaNodeMemory * node_memory_
same above
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Definition: memory_id.hpp:44
memory::GlobalVolatilePageResolver global_volatile_page_resolver_
Page resolver to convert all page ID to page pointer.
xct::McsImpl< ThreadPimplMcsAdaptor< RW_BLOCK >, RW_BLOCK > get_mcs_impl(ThreadPimpl *pimpl)
std::thread raw_thread_
Encapsulates raw thread object.
xct::McsRwExtendedBlock * get_mcs_rw_extended_blocks()
Shared data of ThreadPimpl.
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
A client has set a next task.
Definition: thread_id.hpp:209
Represents a pointer to a volatile page with modification count for preventing ABA.
Definition: storage_id.hpp:194
storage::Page * resolve_offset_newpage(PagePoolOffset offset) const __attribute__((always_inline))
As the name suggests, this version is for new pages, which don't have sanity checks.
UniversalLockId get_max_locked_id() const
ErrorCode read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page *buffer) __attribute__((always_inline))
Read a snapshot page using the thread-local file descriptor set.
memory::PagePool * snapshot_page_pool_
shorthand for node_memory_->get_snapshot_pool()
void cll_giveup_all_locks_after(xct::UniversalLockId address)
void giveup_all_after(UniversalLockId address, MCS_RW_IMPL *mcs_rw_impl)
This gives-up locks in CLL that are not yet taken.
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
Reader-writer (RW) MCS lock classes.
Definition: xct_id.hpp:387
Represents a time epoch.
Definition: epoch.hpp:61
Part of NodeMemoryAnchors for each thread.
Pin the current thread to the given NUMA node in this object's scope.
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
Max size for find_or_read_snapshot_pages_batch() etc.
Definition: thread.hpp:55
storage::VolatilePagePointer grab_free_volatile_page_pointer()
Wrapper for grab_free_volatile_page().
bool snapshot_cache_enabled_
Whether to cache the read accesses on snapshot files.
ErrorCode sysxct_batch_page_locks(xct::SysxctWorkspace *sysxct_workspace, uint32_t lock_count, storage::Page **pages)
std::atomic< bool > raw_thread_set_
Just to make sure raw_thread_ is set.
const EngineOptions & get_options() const
Definition: engine.cpp:39
bool overwrite_thread_schedule_
Whether to overwrite policy/priority of worker threads.
Pimpl object of Thread.
xct::McsWwBlock * mcs_ww_blocks_
Pre-allocated MCS blocks.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
Definition: xct_id.hpp:134
PagePoolOffset end_
where a valid page entry ends.
soc::SharedPolling wakeup_cond_
The thread sleeps on this conditional when it has no task.
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
soc::SocId get_soc_count() const
Shorthand for get_options().thread_.group_count_.
Definition: engine.cpp:74
uint64_t hot_threshold_
Page hotness >= this value will be considered hot (hybrid CC only).
bool is_volatile_page_retired(storage::VolatilePagePointer ptr)
Subroutine of collect_retired_volatile_page() just for assertion.
Batches zero or more ErrorStack objects to represent in one ErrorStack.
storage::Page * get_base() const
Definition: page_pool.cpp:119
VolatilePagePointer volatile_pointer_
Definition: storage_id.hpp:308
void assert_valid_volatile_page(const Page *page, uint32_t offset)
Definition: page.hpp:407
0 means no-error.
Definition: error_code.hpp:87
A functor representing the logic in a system transaction via virtual-function.
void release_all_locks(MCS_RW_IMPL *mcs_rw_impl)
The thread is requested to terminate.
Definition: thread_id.hpp:215
ErrorCode find_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, ContentId *out) const
Batched version of find().
void push_back(PagePoolOffset offset, const Epoch &safe_epoch)
Definition: page_pool.hpp:140
ErrorCode try_or_acquire_single_lock(LockListPosition pos, MCS_RW_IMPL *mcs_rw_impl)
Methods below take or release locks, so they receive MCS_RW_IMPL, a template param.
memory::PagePoolOffset get_offset() const
Definition: storage_id.hpp:202
ErrorCode find_or_read_snapshot_pages_batch(uint16_t batch_size, const storage::SnapshotPagePointer *page_ids, storage::Page **out)
Batched version of find_or_read_a_snapshot_page().
Definitions of IDs in this package and a few related constant values.
const ThreadGroupId numa_node_
Node this thread belongs to.
storage::StorageOptions storage_
ErrorCode cll_try_or_acquire_single_lock(xct::LockListPosition pos)
ErrorCode sysxct_page_lock(xct::SysxctWorkspace *sysxct_workspace, storage::Page *page)
Takes a page lock in the same page for a sysxct running under this thread.
void flush_retired_volatile_page(uint16_t node, Epoch current_epoch, memory::PagePoolOffsetAndEpochChunk *chunk)
Subroutine of collect_retired_volatile_page() in case the chunk becomes full.
Engine *const engine_
MCS locks methods.
ErrorCode sysxct_record_lock(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, xct::RwLockableXctId *lock)
0x0901 : "SPCACHE: Not enough free snapshot pages. Cleaner is not catching up" .
Definition: error_code.hpp:192
Implements an MCS-locking Algorithm.
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
Definition: xct_id.hpp:148
void cll_release_all_locks_after(xct::UniversalLockId address)
RW-locks.
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
Definition: storage_id.hpp:79
Pre-allocated MCS block for extended version of RW-locks.
Definition: xct_id.hpp:513
Thread *const holder_
The public object that holds this pimpl object.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
void release_free_snapshot_page(PagePoolOffset offset)
Same, except it's for snapshot page.
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
The thread has picked the task up and is now running.
Definition: thread_id.hpp:211
ErrorCode follow_page_pointers_for_read_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
Batched version of follow_page_pointer with will_modify==false.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
ThreadPriority thread_priority_
Thread priority for worker threads.
uint32_t mcs_rw_async_mapping_current_
How many async mappings for extended RW lock we have so far.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
SnapshotPagePointer snapshot_pointer_
Definition: storage_id.hpp:307
ErrorCode sysxct_batch_record_locks(xct::SysxctWorkspace *sysxct_workspace, storage::VolatilePagePointer page_id, uint32_t lock_count, xct::RwLockableXctId **locks)
Typedefs of ID types used in procedure package.
A view of Thread object for other SOCs and master engine.
Definition: thread_ref.hpp:39
log::ThreadLogBuffer log_buffer_
Thread-private log buffer.
ErrorCode try_or_acquire_multiple_locks(LockListPosition upto_pos, MCS_RW_IMPL *mcs_rw_impl)
Acquire multiple locks up to the given position in canonical order.
Set of arguments, both inputs and outputs, given to each volatile page initializer.
Definition: page.hpp:365
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
Epoch one_more() const
Definition: epoch.hpp:127
void cll_release_all_locks_after(xct::UniversalLockId address)
Release all locks in CLL of this thread whose addresses are canonically ordered before the parameter...
void cll_giveup_all_locks_after(xct::UniversalLockId address)
This gives-up locks in CLL that are not yet taken.
PageVersion page_version_
Used in several storage types as concurrency control mechanism for the page.
Definition: page.hpp:272
uint16_t hot_threshold_for_retrospective_lock_list_
When we construct Retrospective Lock List (RLL) after aborts, we add read-locks on records whose hotn...
void set(uint8_t numa_node, memory::PagePoolOffset offset)
Definition: storage_id.hpp:212
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
Definition: thread_id.hpp:131
SysxctWorkspace * get_sysxct_workspace() const
Definition: xct.hpp:142
ErrorCode follow_page_pointer(storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool will_modify, bool take_ptr_set_snapshot, storage::DualPagePointer *pointer, storage::Page **page, const storage::Page *parent, uint16_t index_in_parent)
A general method to follow (read) a page pointer.
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries)
Sysxct-related.
memory::LocalPageResolver local_volatile_page_resolver_
Page resolver to convert only local page ID to page pointer.
void handle_tasks()
Main routine of the worker thread.
proc::ProcManager * get_proc_manager() const
See System and User Procedures.
Definition: engine.cpp:51
0x0301 : "MEMORY : Not enough free volatile pages. Check the config of MemoryOptions" ...
Definition: error_code.hpp:142
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
Definition: page_pool.cpp:134
void spinlock_yield()
Invoke _mm_pause(), x86 PAUSE instruction, or something equivalent in the env.
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
PagePoolOffsetAndEpochChunk * get_retired_volatile_pool_chunk(uint16_t node)
xct::McsRwSimpleBlock * mcs_rw_simple_blocks_
xct::XctManager * get_xct_manager() const
See Transaction Manager.
Definition: engine.cpp:61
cache::CacheHashtable * get_snapshot_cache_table()
ErrorStack initialize_once() override final
const ThreadId id_
Unique ID of this thread.
xct::Xct current_xct_
Current transaction this thread is conveying.
PagePoolOffset grab_free_snapshot_page()
Same, except it's for snapshot page.
uint16_t ThreadGlobalOrdinal
Typedef for a globally and contiguously numbered ID of thread.
Definition: thread_id.hpp:98
ErrorCode find_or_read_a_snapshot_page(storage::SnapshotPagePointer page_id, storage::Page **out)
Find the given page in snapshot cache, reading it if not found.
std::string os_error()
Thread-safe strerror(errno).
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
Definition: error_code.hpp:155
ErrorStack get_proc(const ProcName &name, Proc *out)
Returns the function pointer of the specified procedure.
thread::ThreadOptions thread_
IsolationLevel get_isolation_level() const
Returns the level of isolation for this transaction.
Definition: xct.hpp:149
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
NumaNodeMemory * get_local_memory() const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
Definition: thread_id.hpp:80
ThreadStatus status_
Impersonation status of this thread.
const ErrorStack kRetOk
Normal return value for no-error case.
void advance_current_global_epoch()
Requests to advance the current global epoch as soon as possible and blocks until it actually does...
uint16_t mcs_implementation_type_
Defines which implementation of MCS locks to use for RW locks.
xct::McsRwSimpleBlock * get_mcs_rw_simple_blocks()
Unconditionally takes MCS lock on the given mcs_lock.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
Definition: engine.cpp:59
void set_default_rll_threshold_for_this_xct(uint16_t value)
Definition: xct.hpp:139
Convenient way of writing hex integers to stream.
xct::McsRwExtendedBlock * mcs_rw_extended_blocks_
storage::Page * place_a_new_volatile_page(memory::PagePoolOffset new_offset, storage::DualPagePointer *pointer)
Subroutine of install_a_volatile_page() and follow_page_pointer() to atomically place the given new v...
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
void(* VolatilePageInit)(const VolatilePageInitArguments &args)
A function pointer to initialize a volatile page.
Definition: page.hpp:387
ThreadPolicy thread_policy_
Thread policy for worker threads.
ErrorCode follow_page_pointers_for_read_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, bool tolerate_null_pointer, bool take_ptr_set_snapshot, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, bool *followed_snapshots, storage::Page **out)
Batched version of follow_page_pointer with will_modify==false.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
xct::McsRwAsyncMapping * mcs_rw_async_mappings_
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
bool running_sysxct_
Whether we are already running a sysxct using this workspace.
The thread has completed the task and set the result.
Definition: thread_id.hpp:213
Sorted list of all locks, either read-lock or write-lock, taken in the current run.
ThreadRef get_thread_ref(ThreadId id)
ErrorStack(* Proc)(const ProcArguments &args)
A function pointer of a user/system stored procedure.
Definition: proc_id.hpp:113
Used to store an epoch value with each entry in PagePoolOffsetChunk.
Definition: page_pool.hpp:108
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
Definition: page_pool.cpp:146
ErrorCode acquire_local_work_memory(uint32_t size, void **out, uint32_t alignment=8)
Get a tentative work memory of the specified size from pre-allocated thread-private memory...
Definition: xct.hpp:397
xct::UniversalLockId cll_get_max_locked_id() const
bool snapshot_
Whether this page image is of a snapshot page.
Definition: page.hpp:211
#define UNLIKELY(x)
Hints that x is highly likely false.
Definition: compiler.hpp:104
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorCode on_snapshot_cache_miss(storage::SnapshotPagePointer page_id, memory::PagePoolOffset *pool_offset)
Set of arguments, both inputs and outputs, given to each procedure.
Definition: proc_id.hpp:81
ErrorCode run_nested_sysxct(xct::SysxctFunctor *functor, uint32_t max_retries=0)
Methods related to System transactions (sysxct) nested under this thread.
ThreadMemoryAnchors * get_thread_memory_anchors(thread::ThreadId thread_id)
memory::NumaCoreMemory * core_memory_
Private memory repository of this thread.
bool simple_mcs_rw_
shortcut for engine_->get_options().xct_.mcs_implementation_type_ == simple
ErrorCode add_to_pointer_set(const storage::VolatilePagePointer *pointer_address, storage::VolatilePagePointer observed)
Add the given page pointer to the pointer set of this transaction.
Definition: xct.cpp:198
uint32_t get_safe_offset_count(const Epoch &threshold) const
Returns the number of offsets (always from index-0) whose safe_epoch_ is strictly-before the given ep...
Definition: page_pool.cpp:57
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
Definition: engine.cpp:50
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
Definition: storage_id.hpp:45
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
Definition: engine.cpp:52
bool is_initialized() const override final
Returns whether the object has been already initialized or not.
Per-thread reused work memory for system transactions.
The thread has terminated (either error or normal, check the result to differentiate them)...
Definition: thread_id.hpp:217
cache::CacheHashtable * snapshot_cache_hashtable_
same above
Protects against all anomalies in all situations.
Definition: xct_id.hpp:86
ErrorCode cll_try_or_acquire_multiple_locks(xct::LockListPosition upto_pos)
Acquire multiple locks up to the given position in canonical order.
void signal()
Signal it to let waiters exit.
bool is_error() const
Returns if this return code is not kErrorCodeOk.
cache::CacheOptions cache_
ErrorCode follow_page_pointers_for_write_batch(uint16_t batch_size, storage::VolatilePageInit page_initializer, storage::DualPagePointer **pointers, storage::Page **parents, const uint16_t *index_in_parents, storage::Page **out)
Batched version of follow_page_pointer with will_modify==true and tolerate_null_pointer==true.
uint64_t page_id_
Page ID of this page.
Definition: page.hpp:191
ContentId find(storage::SnapshotPagePointer page_id) const __attribute__((always_inline))
Returns an offset for the given page ID opportunistically.
void initialize(memory::NumaCoreMemory *core_memory, uint32_t *mcs_block_current, uint32_t *mcs_rw_async_mapping_current)
Definition: xct.cpp:77
ThreadControlBlock * control_block_
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
Definition: soc_manager.cpp:38