libfoedus-core
FOEDUS Core Library
retrospective_lock_list.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <glog/logging.h>
21 
22 #include <algorithm>
23 #include <cstring>
24 
26 #include "foedus/storage/page.hpp"
27 #include "foedus/thread/thread.hpp"
28 #include "foedus/thread/thread_pimpl.hpp" // only for explicit template instantiation
29 #include "foedus/xct/xct.hpp"
30 #include "foedus/xct/xct_mcs_adapter_impl.hpp" // only for explicit template instantiation
31 #include "foedus/xct/xct_mcs_impl.hpp" // only for explicit template instantiation
32 
33 namespace foedus {
34 namespace xct {
35 
40  array_ = nullptr;
41  capacity_ = 0;
42  clear_entries();
43 }
44 
46  uninit();
47 }
48 
50  LockEntry* array,
51  uint32_t capacity,
52  const memory::GlobalVolatilePageResolver& resolver) {
53  array_ = array;
54  capacity_ = capacity;
55  volatile_page_resolver_ = resolver;
56  clear_entries();
57 }
58 
60  last_active_entry_ = kLockListPositionInvalid;
61  if (array_) {
62  // Dummy entry
64  array_[kLockListPositionInvalid].lock_ = nullptr;
67  }
68 }
69 
71  array_ = nullptr;
72  capacity_ = 0;
73  clear_entries();
74 }
75 
77  array_ = nullptr;
78  capacity_ = 0;
79  clear_entries();
80 }
81 
83  uninit();
84 }
85 
87  LockEntry* array,
88  uint32_t capacity,
89  const memory::GlobalVolatilePageResolver& resolver) {
90  array_ = array;
91  capacity_ = capacity;
92  volatile_page_resolver_ = resolver;
93  clear_entries();
94 }
95 
97  last_active_entry_ = kLockListPositionInvalid;
98  last_locked_entry_ = kLockListPositionInvalid;
99  if (array_) {
100  // Dummy entry
102  array_[kLockListPositionInvalid].lock_ = nullptr;
105  }
106 }
107 
108 
110  array_ = nullptr;
111  capacity_ = 0;
112  clear_entries();
113 }
114 
118 std::ostream& operator<<(std::ostream& o, const LockEntry& v) {
119  o << "<LockEntry>"
120  << "<LockId>" << v.universal_lock_id_ << "</LockId>"
121  << "<PreferredMode>" << v.preferred_mode_ << "</PreferredMode>"
122  << "<TakenMode>" << v.taken_mode_ << "</TakenMode>";
123  if (v.lock_) {
124  o << *(v.lock_);
125  } else {
126  o << "<Lock>nullptr</Lock>";
127  }
128  o << "</LockEntry>";
129  return o;
130 }
131 
132 std::ostream& operator<<(std::ostream& o, const CurrentLockList& v) {
133  o << "<CurrentLockList>"
134  << "<Capacity>" << v.capacity_ << "</Capacity>"
135  << "<LastActiveEntry>" << v.last_active_entry_ << "</LastActiveEntry>"
136  << "<LastLockedEntry>" << v.last_locked_entry_ << "</LastLockedEntry>";
137  const uint32_t kMaxShown = 32U;
138  for (auto i = 1U; i <= std::min(v.last_active_entry_, kMaxShown); ++i) {
139  o << v.array_[i];
140  }
141  o << "</CurrentLockList>";
142  return o;
143 }
144 
145 std::ostream& operator<<(std::ostream& o, const RetrospectiveLockList& v) {
146  o << "<RetrospectiveLockList>"
147  << "<Capacity>" << v.capacity_ << "</Capacity>"
148  << "<LastActiveEntry>" << v.last_active_entry_ << "</LastActiveEntry>";
149  const uint32_t kMaxShown = 32U;
150  for (auto i = 1U; i <= std::min(v.last_active_entry_, kMaxShown); ++i) {
151  o << v.array_[i];
152  }
153  o << "</RetrospectiveLockList>";
154  return o;
155 }
156 
157 
158 template<typename LOCK_LIST>
159 void lock_assert_sorted(const LOCK_LIST& list) {
160  const LockEntry* array = list.get_array();
161  ASSERT_ND(array[kLockListPositionInvalid].universal_lock_id_ == 0);
162  ASSERT_ND(array[kLockListPositionInvalid].lock_ == nullptr);
163  ASSERT_ND(array[kLockListPositionInvalid].taken_mode_ == kNoLock);
164  ASSERT_ND(array[kLockListPositionInvalid].preferred_mode_ == kNoLock);
165  const LockListPosition last_active_entry = list.get_last_active_entry();
166  for (LockListPosition pos = 2U; pos <= last_active_entry; ++pos) {
167  ASSERT_ND(array[pos - 1U].universal_lock_id_ < array[pos].universal_lock_id_);
168  ASSERT_ND(array[pos].universal_lock_id_ != 0);
169  ASSERT_ND(array[pos].lock_ != nullptr);
170  const storage::Page* page = storage::to_page(array[pos].lock_);
171  uintptr_t lock_addr = reinterpret_cast<uintptr_t>(array[pos].lock_);
172  auto page_id = page->get_volatile_page_id();
173  ASSERT_ND(array[pos].universal_lock_id_
174  == to_universal_lock_id(page_id.get_numa_node(), page_id.get_offset(), lock_addr));
175  }
176 }
177 
180  lock_assert_sorted(*this);
181 }
183  lock_assert_sorted(*this);
184 }
185 
191  return lock_binary_search<CurrentLockList, LockEntry>(*this, lock);
192 }
194  return lock_binary_search<RetrospectiveLockList, LockEntry>(*this, lock);
195 }
198  return lock_lower_bound<CurrentLockList, LockEntry>(*this, lock);
199 }
201  return lock_lower_bound<RetrospectiveLockList, LockEntry>(*this, lock);
202 }
203 
205  UniversalLockId id,
206  RwLockableXctId* lock,
207  LockMode preferred_mode) {
209  ASSERT_ND(id == xct_id_to_universal_lock_id(volatile_page_resolver_, lock));
210  // Easy case? (lock >= the last entry)
211  LockListPosition insert_pos = lower_bound(id);
212  ASSERT_ND(insert_pos != kLockListPositionInvalid);
213 
214  // Larger than all existing entries? Append to the last!
215  if (insert_pos > last_active_entry_) {
216  ASSERT_ND(insert_pos == last_active_entry_ + 1U);
217  LockListPosition new_pos = issue_new_position();
218  array_[new_pos].set(id, lock, preferred_mode, kNoLock);
219  ASSERT_ND(new_pos == insert_pos);
220  return new_pos;
221  }
222 
223  // lower_bound returns the first entry that is NOT less than. is it equal?
224  ASSERT_ND(array_[insert_pos].universal_lock_id_ >= id);
225  if (array_[insert_pos].universal_lock_id_ == id) {
226  // Found existing!
227  if (array_[insert_pos].preferred_mode_ < preferred_mode) {
228  array_[insert_pos].preferred_mode_ = preferred_mode;
229  }
230  return insert_pos;
231  }
232 
233  DVLOG(1) << "not an easy case. We need to adjust the order. This is costly!";
234  ASSERT_ND(insert_pos <= last_active_entry_); // otherwise we went into the 1st branch
235  ASSERT_ND(array_[insert_pos].universal_lock_id_ > id); // if ==, we went into the 2nd branch
236  ASSERT_ND(insert_pos == 1U || array_[insert_pos - 1U].universal_lock_id_ < id);
237 
238  LockListPosition new_last_pos = issue_new_position();
239  ASSERT_ND(new_last_pos > insert_pos);
240  uint64_t moved_bytes = sizeof(LockEntry) * (new_last_pos - insert_pos);
241  std::memmove(array_ + insert_pos + 1U, array_ + insert_pos, moved_bytes);
242  DVLOG(1) << "Re-sorted. hope this won't happen often";
243  array_[insert_pos].set(id, lock, preferred_mode, kNoLock);
244  if (last_locked_entry_ >= insert_pos) {
245  ++last_locked_entry_;
246  }
247 
248  assert_sorted();
249  return insert_pos;
250 }
251 
252 void RetrospectiveLockList::construct(thread::Thread* context, uint32_t read_lock_threshold) {
253  Xct* xct = &context->get_current_xct();
254  ASSERT_ND(xct->is_active());
255  // We currently hold read/write-set separately. So, we need to sort and merge them.
256  ReadXctAccess* read_set = xct->get_read_set();
257  const uint32_t read_set_size = xct->get_read_set_size();
258  WriteXctAccess* write_set = xct->get_write_set();
259  const uint32_t write_set_size = xct->get_write_set_size();
260  ASSERT_ND(capacity_ >= read_set_size + write_set_size);
261 
262  last_active_entry_ = kLockListPositionInvalid;
263  if (read_set_size == 0 && write_set_size == 0) {
264  return;
265  }
266 
267  for (uint32_t i = 0; i < read_set_size; ++i) {
268  RwLockableXctId* lock = read_set[i].owner_id_address_;
269  storage::Page* page = storage::to_page(lock);
270  if (page->get_header().hotness_.value_ < read_lock_threshold
271  && lock->xct_id_ == read_set[i].observed_owner_id_) {
272  // We also add it to RLL whenever we observed a verification error.
273  continue;
274  }
275 
276  auto pos = issue_new_position();
277  ASSERT_ND(
278  read_set[i].owner_lock_id_ ==
279  xct_id_to_universal_lock_id(volatile_page_resolver_, lock));
280  array_[pos].set(read_set[i].owner_lock_id_, lock, kReadLock, kNoLock);
281  }
282  DVLOG(1) << "Added " << last_active_entry_ << " to RLL for read-locks";
283 
284  // Writes are always added to RLL.
285  for (uint32_t i = 0; i < write_set_size; ++i) {
286  auto pos = issue_new_position();
287  ASSERT_ND(
288  write_set[i].owner_lock_id_ ==
289  xct_id_to_universal_lock_id(volatile_page_resolver_, write_set[i].owner_id_address_));
290  array_[pos].set(
291  write_set[i].owner_lock_id_,
292  write_set[i].owner_id_address_,
293  kWriteLock,
294  kNoLock);
295  }
296 
297  // Now, the entries are not sorted and we might have duplicates.
298  // Sort them, and merge entries for the same record.
299  // std::set? no joke. we can't afford heap allocation here.
300  ASSERT_ND(last_active_entry_ != kLockListPositionInvalid);
301  std::sort(array_ + 1U, array_ + last_active_entry_ + 1U);
302  LockListPosition prev_pos = 1U;
303  uint32_t merged_count = 0U;
304  for (LockListPosition pos = 2U; pos <= last_active_entry_; ++pos) {
305  ASSERT_ND(prev_pos < pos);
306  ASSERT_ND(array_[prev_pos].universal_lock_id_ <= array_[pos].universal_lock_id_);
307  if (array_[prev_pos].universal_lock_id_ == array_[pos].universal_lock_id_) {
308  // Merge!
309  if (array_[pos].preferred_mode_ == kWriteLock) {
310  array_[prev_pos].preferred_mode_ = kWriteLock;
311  }
312  ++merged_count;
313  } else {
314  // No merge.
315  if (prev_pos + 1U < pos) {
316  std::memcpy(array_ + prev_pos + 1U, array_ + pos, sizeof(LockEntry));
317  }
318  ++prev_pos;
319  }
320  }
321 
322  // For example, last_active_entry_ was 3 (0=Dummy, 1=A, 2=A, 3=B),
323  // prev_pos becomes 2 while merged count is 1.
324  ASSERT_ND(prev_pos + merged_count == last_active_entry_);
325  last_active_entry_ = prev_pos;
326  assert_sorted();
327 }
328 
330  const WriteXctAccess* write_set,
331  uint32_t write_set_size) {
332  // We want to avoid full-sorting and minimize the number of copies/shifts.
333  // Luckily, write_set is already sorted, so is our array_. Just merge them in order.
334  if (write_set_size == 0) {
335  return;
336  }
337 #ifndef NDEBUG
338  for (uint32_t i = 1; i < write_set_size; ++i) {
339  ASSERT_ND(write_set[i - 1].ordinal_ != write_set[i].ordinal_);
340  if (write_set[i].owner_lock_id_ == write_set[i - 1].owner_lock_id_) {
341  ASSERT_ND(write_set[i - 1].ordinal_ < write_set[i].ordinal_);
342  } else {
343  ASSERT_ND(write_set[i - 1].owner_lock_id_ < write_set[i].owner_lock_id_);
344  }
345  }
346  assert_sorted();
347 #endif // NDEBUG
348 
349  // Implementation note: I considered a few approaches to efficiently do the merge.
350  // 1) insertion-sort: that sounds expensive... we might be inserting several
351  // 2) a bit complex. first path to identify the number of new entries, then second path to
352  // merge from the end, not the beginning, to copy/shift only what we need to.
353  // 3) insert all write-sets at the end then invoke std::sort once.
354  // For now I picked 3) for simplicity. Revisit laster if CPU profile tells something.
355  if (last_active_entry_ == kLockListPositionInvalid) {
356  // If CLL is now empty, it's even easier. Just add all write-sets
357  uint32_t added = 0;
358  for (uint32_t write_pos = 0; write_pos < write_set_size; ++write_pos) {
359  const WriteXctAccess* write = write_set + write_pos;
360  if (write_pos > 0) {
361  const WriteXctAccess* prev = write_set + write_pos - 1;
362  ASSERT_ND(write->ordinal_ != prev->ordinal_);
363  ASSERT_ND(write->owner_lock_id_ >= prev->owner_lock_id_);
364  if (write->owner_lock_id_ == prev->owner_lock_id_) {
365  ASSERT_ND(write->ordinal_ > prev->ordinal_);
366  continue;
367  }
368  }
369  ++added;
370  LockEntry* new_entry = array_ + added;
371  new_entry->set(write->owner_lock_id_, write->owner_id_address_, kWriteLock, kNoLock);
372  }
373  last_active_entry_ = added;
374  } else {
375  uint32_t write_pos = 0;
376  uint32_t added = 0;
377  for (LockListPosition pos = 1U; pos <= last_active_entry_ && write_pos < write_set_size;) {
378  LockEntry* existing = array_ + pos;
379  const WriteXctAccess* write = write_set + write_pos;
380  UniversalLockId write_lock_id = write->owner_lock_id_;
381  if (existing->universal_lock_id_ < write_lock_id) {
382  ++pos;
383  } else if (existing->universal_lock_id_ == write_lock_id) {
384  if (existing->preferred_mode_ != kWriteLock) {
385  existing->preferred_mode_ = kWriteLock;
386  }
387  ++write_pos;
388  } else {
389  // yuppy, new entry.
390  ASSERT_ND(existing->universal_lock_id_ > write_lock_id);
391  ++added;
392  LockEntry* new_entry = array_ + last_active_entry_ + added;
393  new_entry->set(write_lock_id, write->owner_id_address_, kWriteLock, kNoLock);
394  // be careful on duplicate in write-set.
395  // It might contain multiple writes to one record.
396  for (++write_pos; write_pos < write_set_size; ++write_pos) {
397  const WriteXctAccess* next_write = write_set + write_pos;
398  UniversalLockId next_write_id = next_write->owner_lock_id_;
399  ASSERT_ND(next_write_id >= write_lock_id);
400  if (next_write_id > write_lock_id) {
401  break;
402  }
403  }
404  }
405  }
406 
407  while (write_pos < write_set_size) {
408  // After iterating over all existing entries, still some write-set entry remains.
409  // Hence they are all after the existing entries.
410  const WriteXctAccess* write = write_set + write_pos;
411  UniversalLockId write_lock_id = write->owner_lock_id_;
412  ASSERT_ND(last_active_entry_ == kLockListPositionInvalid
413  || array_[last_active_entry_].universal_lock_id_ < write_lock_id);
414 
415  // Again, be careful on duplicate in write set.
416  ++added;
417  LockEntry* new_entry = array_ + last_active_entry_ + added;
418  new_entry->set(write_lock_id, write->owner_id_address_, kWriteLock, kNoLock);
419  for (++write_pos; write_pos < write_set_size; ++write_pos) {
420  const WriteXctAccess* next_write = write_set + write_pos;
421  UniversalLockId next_write_id = next_write->owner_lock_id_;
422  ASSERT_ND(next_write_id >= write_lock_id);
423  if (next_write_id > write_lock_id) {
424  break;
425  }
426  }
427  }
428 
429  if (added > 0) {
430  last_active_entry_ += added;
431  std::sort(array_ + 1U, array_ + 1U + last_active_entry_);
432  }
433  }
434 
435  // Adjusting last_locked_entry_ is not impossible.. but let's just recalculate. Not too often.
436  last_locked_entry_ = calculate_last_locked_entry();
437  assert_sorted();
438 #ifndef NDEBUG
439  for (uint32_t i = 0; i < write_set_size; ++i) {
440  ASSERT_ND(binary_search(write_set[i].owner_lock_id_) != kLockListPositionInvalid);
441  }
442 #endif // NDEBUG
443 }
444 
446  ASSERT_ND(is_empty());
447  ASSERT_ND(!rll.is_empty());
448  rll.assert_sorted();
449  // Because now we use LockEntry for both RLL and CLL, we can do just one memcpy
450  std::memcpy(array_ + 1U, rll.get_array() + 1U, sizeof(LockEntry) * rll.get_last_active_entry());
451  last_active_entry_ = rll.get_last_active_entry();
452  last_locked_entry_ = kLockListPositionInvalid;
453  assert_sorted();
454 }
455 
456 void CurrentLockList::release_all_after_debuglog(
457  uint32_t released_read_locks,
458  uint32_t released_write_locks,
459  uint32_t already_released_locks,
460  uint32_t canceled_async_read_locks,
461  uint32_t canceled_async_write_locks) const {
462  DVLOG(1) << " Unlocked " << released_read_locks << " read locks and"
463  << " " << released_write_locks << " write locks. " << already_released_locks
464  << " Also cancelled " << canceled_async_read_locks << " async-waiting read locks, "
465  << " " << canceled_async_write_locks << " async-waiting write locks. "
466  << " " << already_released_locks << " were already unlocked";
467 }
468 
469 void CurrentLockList::giveup_all_after_debuglog(
470  uint32_t givenup_read_locks,
471  uint32_t givenup_write_locks,
472  uint32_t givenup_upgrades,
473  uint32_t already_enough_locks,
474  uint32_t canceled_async_read_locks,
475  uint32_t canceled_async_write_locks) const {
476  DVLOG(1) << " Gave up " << givenup_read_locks << " read locks and"
477  << " " << givenup_write_locks << " write locks, " << givenup_upgrades << " upgrades."
478  << " Also cancelled " << canceled_async_read_locks << " async-waiting read locks, "
479  << " " << canceled_async_write_locks << " async-waiting write locks. "
480  << " " << already_enough_locks << " already had enough lock mode";
481 }
482 
483 } // namespace xct
484 } // namespace foedus
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
Definition: thread.cpp:75
taken_mode_: we took a read-lock, not write-lock yet.
Definition: xct_id.hpp:105
UniversalLockId universal_lock_id_
Used to order locks in canonical order.
std::ostream & operator<<(std::ostream &o, const LockEntry &v)
Debugging.
LockListPosition binary_search(UniversalLockId lock) const
Analogous to std::binary_search() for the given lock.
Page * to_page(const void *address)
super-dirty way to obtain Page the address belongs to.
Definition: page.hpp:395
void set(UniversalLockId id, RwLockableXctId *lock, LockMode preferred_mode, LockMode taken_mode)
uint32_t get_write_set_size() const
Definition: xct.hpp:157
uint32_t ordinal_
Indicates the ordinal among ReadXctAccess/WriteXctAccess of this transaction.
Definition: xct_access.hpp:96
void construct(thread::Thread *context, uint32_t read_lock_threshold)
Fill out this retrospetive lock list for the next run of the given transaction.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
Represents a record of write-access during a transaction.
Definition: xct_access.hpp:168
Represents one thread running on one NUMA core.
Definition: thread.hpp:48
uint8_t value_
Log arithmic counter of aborts.
Represents a user transaction.
Definition: xct.hpp:58
LockListPosition get_last_active_entry() const
LockListPosition lower_bound(UniversalLockId lock) const
Analogous to std::lower_bound() for the given lock.
const LockListPosition kLockListPositionInvalid
Definition: xct_id.hpp:149
Represents a record of read-access during a transaction.
Definition: xct_access.hpp:139
bool is_active() const
Returns whether the object is an active transaction.
Definition: xct.hpp:121
XctId xct_id_
the second 64bit: Persistent status part of TID.
Definition: xct_id.hpp:1137
An entry in CLL and RLL, representing a lock that is taken or will be taken.
void prepopulate_for_retrospective_lock_list(const RetrospectiveLockList &rll)
Another batch-insert method used at the beginning of a transaction.
ReadXctAccess * get_read_set()
Definition: xct.hpp:162
taken_mode_: Not taken the lock yet.
Definition: xct_id.hpp:100
uintptr_t UniversalLockId
Universally ordered identifier of each lock.
Definition: xct_id.hpp:134
The MCS reader-writer lock variant of LockableXctId.
Definition: xct_id.hpp:1132
UniversalLockId to_universal_lock_id(storage::VolatilePagePointer page_id, uintptr_t addr)
Definition: sysxct_impl.hpp:63
VolatilePagePointer get_volatile_page_id() const
Definition: page.hpp:339
UniversalLockId xct_id_to_universal_lock_id(const memory::GlobalVolatilePageResolver &resolver, RwLockableXctId *lock)
Definition: xct_id.hpp:1226
uint32_t get_read_set_size() const
Definition: xct.hpp:156
RwLockableXctId * owner_id_address_
Pointer to the accessed record.
Definition: xct_access.hpp:102
uint32_t LockListPosition
Index in a lock-list, either RLL or CLL.
Definition: xct_id.hpp:148
LockListPosition binary_search(UniversalLockId lock) const
Analogous to std::binary_search() for the given lock.
LockListPosition calculate_last_locked_entry() const
Calculate last_locked_entry_ by really checking the whole list.
LockMode preferred_mode_
Whick lock mode we should take according to RLL.
taken_mode_: we took a write-lock.
Definition: xct_id.hpp:110
LockListPosition lower_bound(UniversalLockId lock) const
Analogous to std::lower_bound() for the given lock.
LockMode taken_mode_
Whick lock mode we have taken during the current run (of course initially kNoLock) ...
Just a marker to denote that the memory region represents a data page.
Definition: page.hpp:334
void init(LockEntry *array, uint32_t capacity, const memory::GlobalVolatilePageResolver &resolver)
RwLockableXctId * lock_
Virtual address of the lock.
WriteXctAccess * get_write_set()
Definition: xct.hpp:163
void lock_assert_sorted(const LOCK_LIST &list)
void batch_insert_write_placeholders(const WriteXctAccess *write_set, uint32_t write_set_size)
Create entries for all write-sets in one-shot.
LockMode
Represents a mode of lock.
Definition: xct_id.hpp:95
void init(LockEntry *array, uint32_t capacity, const memory::GlobalVolatilePageResolver &resolver)
Resolves an offset in a volatile page pool to an actual pointer and vice versa.
PageHeader & get_header()
At least the basic header exists in all pages.
Definition: page.hpp:336
Sorted list of all locks, either read-lock or write-lock, taken in the current run.
assorted::ProbCounter hotness_
Loosely maintained statistics on data temperature.
Definition: page.hpp:268
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void assert_sorted() const __attribute__((always_inline))
UniversalLockId owner_lock_id_
Universal Lock ID of the lock in the record.
Definition: xct_access.hpp:105
void assert_sorted() const __attribute__((always_inline))
LockListPosition get_or_add_entry(UniversalLockId lock_id, RwLockableXctId *lock, LockMode preferred_mode)
Adds an entry to this list, re-sorting part of the list if necessary to keep the sortedness.