libfoedus-core
FOEDUS Core Library
xct_manager_pimpl.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
18 #ifndef FOEDUS_XCT_XCT_MANAGER_PIMPL_HPP_
19 #define FOEDUS_XCT_XCT_MANAGER_PIMPL_HPP_
20 #include <atomic>
21 #include <thread>
22 
23 #include "foedus/epoch.hpp"
24 #include "foedus/fwd.hpp"
25 #include "foedus/initializable.hpp"
29 #include "foedus/thread/fwd.hpp"
31 #include "foedus/xct/fwd.hpp"
32 #include "foedus/xct/retrospective_lock_list.hpp" // to inline CurrentLockListIteratorForWriteSet
33 #include "foedus/xct/xct_access.hpp" // same above. iterator must be fast...
34 #include "foedus/xct/xct_id.hpp"
35 
36 namespace foedus {
37 namespace xct {
40  // this is backed by shared memory. not instantiation. just reinterpret_cast.
41  XctManagerControlBlock() = delete;
42  ~XctManagerControlBlock() = delete;
43 
44  void initialize() {
48  }
49  void uninitialize() {
50  }
51 
62  std::atomic<Epoch::EpochInteger> current_global_epoch_;
69  std::atomic<Epoch::EpochInteger> requested_global_epoch_;
70 
73 
78 
90  std::atomic<bool> new_transaction_paused_;
91 };
92 
100 class XctManagerPimpl final : public DefaultInitializable {
101  public:
102  XctManagerPimpl() = delete;
103  explicit XctManagerPimpl(Engine* engine) : engine_(engine) {}
104  ErrorStack initialize_once() override;
105  ErrorStack uninitialize_once() override;
106 
109  }
112  }
114  return Epoch(control_block_->current_global_epoch_.load(std::memory_order_relaxed));
115  }
116 
117  ErrorCode begin_xct(thread::Thread* context, IsolationLevel isolation_level);
121  ErrorCode precommit_xct(thread::Thread* context, Epoch *commit_epoch);
123 
124  ErrorCode wait_for_commit(Epoch commit_epoch, int64_t wait_microseconds);
125  void set_requested_global_epoch(Epoch request);
127  void wait_for_current_global_epoch(Epoch target_epoch, int64_t wait_microseconds);
129 
136  ErrorCode precommit_xct_readonly(thread::Thread* context, Epoch *commit_epoch);
142  ErrorCode precommit_xct_readwrite(thread::Thread* context, Epoch *commit_epoch);
143 
158  ErrorCode precommit_xct_lock(thread::Thread* context, XctId* max_xct_id);
175  bool precommit_xct_verify_readonly(thread::Thread* context, Epoch *commit_epoch);
185  bool precommit_xct_verify_readwrite(thread::Thread* context, XctId* max_xct_id);
199  void precommit_xct_apply(thread::Thread* context, XctId max_xct_id, Epoch *commit_epoch);
206 
213  void handle_epoch_chime();
216  bool is_stop_requested() const;
217 
219  void pause_accepting_xct();
221  void resume_accepting_xct();
223 
224  Engine* const engine_;
226 
231  std::thread epoch_chime_thread_;
232 };
233 
234 
250  const WriteXctAccess* write_set,
251  const CurrentLockList* cll,
252  uint32_t write_set_size);
253 
258  void next_writes();
259  bool is_valid() const { return write_cur_pos_ < write_next_pos_; }
260 
262  const CurrentLockList* const cll_;
263  const uint32_t write_set_size_;
264 
268  uint32_t write_cur_pos_;
272  uint32_t write_next_pos_;
275 };
276 
278  const WriteXctAccess* write_set,
279  const CurrentLockList* cll,
280  uint32_t write_set_size)
281  : write_set_(write_set),
282  cll_(cll),
283  write_set_size_(write_set_size) {
284  write_cur_pos_ = 0;
285  write_next_pos_ = 0;
287  cll_->assert_sorted();
288 
289  next_writes(); // set to initial record.
290 }
291 
294  ++cll_pos_;
296  return;
297  }
298  const WriteXctAccess* write = write_set_ + write_cur_pos_;
299  const UniversalLockId write_id = write->owner_lock_id_;
300 
301  // CLL must contain all entries in write-set. We are reading in-order.
302  // So, we must find a valid CLL entry that is == write_id
303  const LockEntry* l = cll_->get_entry(cll_pos_);
304  while (l->universal_lock_id_ < write_id) {
305  ASSERT_ND(cll_pos_ < cll_->get_last_active_entry());
306  ++cll_pos_;
307  l = cll_->get_entry(cll_pos_);
308  }
309 
310  ASSERT_ND(l->universal_lock_id_ == write_id);
312  while (true) {
313  ++write_next_pos_;
315  break;
316  }
317  const WriteXctAccess* next_write = write_set_ + write_next_pos_;
318  const UniversalLockId next_write_id = next_write->owner_lock_id_;
319  ASSERT_ND(write_id <= next_write_id);
320  if (write_id < next_write_id) {
321  break;
322  }
323  }
324 }
325 
326 static_assert(
328  "XctManagerControlBlock is too large.");
329 } // namespace xct
330 } // namespace foedus
331 #endif // FOEDUS_XCT_XCT_MANAGER_PIMPL_HPP_
void resume_accepting_xct()
Make sure you call this after pause_accepting_xct().
uint32_t write_next_pos_
exclusive end of write-sets of the current record in write-set.
ErrorStack uninitialize_once() override
UniversalLockId universal_lock_id_
Used to order locks in canonical order.
ErrorStack initialize_once() override
void release_and_clear_all_current_locks(thread::Thread *context)
unlocking all acquired locks, used when commit/abort.
Epoch get_current_global_epoch_weak() const
void precommit_xct_apply(thread::Thread *context, XctId max_xct_id, Epoch *commit_epoch)
Phase 3 of precommit_xct()
ErrorCode precommit_xct_readwrite(thread::Thread *context, Epoch *commit_epoch)
precommit_xct() if the transaction is read-write
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
bool precommit_xct_verify_readwrite(thread::Thread *context, XctId *max_xct_id)
Phase 2 of precommit_xct() for read-write case.
Represents a record of write-access during a transaction.
Definition: xct_access.hpp:168
std::atomic< Epoch::EpochInteger > requested_global_epoch_
If some thread requested to immediately advance epoch, the requested epoch.
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
void wait_until_resume_accepting_xct(thread::Thread *context)
Forward declarations of classes in transaction package.
bool precommit_xct_verify_readonly(thread::Thread *context, Epoch *commit_epoch)
Phase 2 of precommit_xct() for read-only case.
soc::SharedPolling epoch_chime_wakeup_
Fired to wakeup epoch_chime_thread_.
Forward declarations of classes in root package.
Persistent status part of Transaction ID.
Definition: xct_id.hpp:955
ErrorCode precommit_xct_lock_batch_track_moved(thread::Thread *context)
Subroutine of precommit_xct_lock to track most of moved records in write-set.
Brings error stacktrace information as return value of functions.
Definition: error_stack.hpp:81
ErrorCode precommit_xct_lock(thread::Thread *context, XctId *max_xct_id)
Phase 1 of precommit_xct()
const LockListPosition kLockListPositionInvalid
Definition: xct_id.hpp:149
Represents a record of read-access during a transaction.
Definition: xct_access.hpp:139
Represents a time epoch.
Definition: epoch.hpp:61
An entry in CLL and RLL, representing a lock that is taken or will be taken.
A polling-wait mechanism that can be placed in shared memory and used from multiple processes...
Typical implementation of Initializable as a skeleton base class.
ErrorCode abort_xct(thread::Thread *context)
Shared data in XctManagerPimpl.
An iterator over CurrentLockList to find entries along with sorted write-set.
void handle_epoch_chime()
Main routine for epoch_chime_thread_.
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
Definition: xct_id.hpp:134
CurrentLockListIteratorForWriteSet(const WriteXctAccess *write_set, const CurrentLockList *cll, uint32_t write_set_size)
void wait_for_current_global_epoch(Epoch target_epoch, int64_t wait_microseconds)
ErrorCode begin_xct(thread::Thread *context, IsolationLevel isolation_level)
User transactions related methods.
void next_writes()
Look for next record's write-set(s).
uint32_t write_cur_pos_
inclusive beginning of write-sets of the current record in write-set.
Definitions of IDs in this package and a few related constant values.
ErrorCode wait_for_commit(Epoch commit_epoch, int64_t wait_microseconds)
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
Definition: xct_id.hpp:148
void precommit_xct_sort_access(thread::Thread *context)
bool precommit_xct_verify_pointer_set(thread::Thread *context)
Returns false if there is any pointer set conflict.
Database engine object that holds all resources and provides APIs.
Definition: engine.hpp:109
LockEntry * get_entry(LockListPosition pos)
LockListPosition cll_pos_
CLL entry that corresponds to the current record in write-set.
std::atomic< bool > new_transaction_paused_
If true, all new requests to begin_xct() will be paused until this becomes false. ...
ErrorCode precommit_xct(thread::Thread *context, Epoch *commit_epoch)
This is the gut of commit protocol.
XctManagerControlBlock * control_block_
bool precommit_xct_acquire_writer_lock(thread::Thread *context, WriteXctAccess *write)
std::thread epoch_chime_thread_
This thread keeps advancing the current_global_epoch_.
bool precommit_xct_lock_track_write(thread::Thread *context, WriteXctAccess *entry)
used from precommit_xct_lock() to track moved record
IsolationLevel
Specifies the level of isolation during transaction processing.
Definition: xct_id.hpp:55
bool precommit_xct_verify_page_version_set(thread::Thread *context)
Returns false if there is any page version conflict.
void handle_epoch_chime_wait_grace_period(Epoch grace_epoch)
Makes sure all worker threads will commit with an epoch larger than grace_epoch.
Pimpl object of XctManager.
bool precommit_xct_request_writer_lock(thread::Thread *context, WriteXctAccess *write)
void set_requested_global_epoch(Epoch request)
std::atomic< Epoch::EpochInteger > current_global_epoch_
The current epoch of the entire engine.
Sorted list of all locks, either read-lock or write-lock, taken in the current run.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
ErrorCode precommit_xct_readonly(thread::Thread *context, Epoch *commit_epoch)
precommit_xct() if the transaction is read-only
std::atomic< bool > epoch_chime_terminate_requested_
Protected by the mutex in epoch_chime_wakeup_.
Forward declarations of classes in thread package.
UniversalLockId owner_lock_id_
Universal Lock ID of the lock in the record.
Definition: xct_access.hpp:105
void pause_accepting_xct()
Pause all begin_xct until you call resume_accepting_xct()
void assert_sorted() const __attribute__((always_inline))
ErrorCode
Enum of error codes defined in error_code.xmacro.
Definition: error_code.hpp:85
soc::SharedPolling current_global_epoch_advanced_
Fired (broadcast) whenever current_global_epoch_ is advanced.
bool precommit_xct_try_acquire_writer_locks(thread::Thread *context)
bool precommit_xct_verify_track_read(thread::Thread *context, ReadXctAccess *entry)
used from verification methods to track moved record