libfoedus-core
FOEDUS Core Library
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
shared_cond.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014-2015, Hewlett-Packard Development Company, LP.
3  * This program is free software; you can redistribute it and/or modify it
4  * under the terms of the GNU General Public License as published by the Free
5  * Software Foundation; either version 2 of the License, or (at your option)
6  * any later version.
7  *
8  * This program is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11  * more details. You should have received a copy of the GNU General Public
12  * License along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * HP designates this particular file as subject to the "Classpath" exception
16  * as provided by HP in the LICENSE.txt file that accompanied this code.
17  */
19 
20 #include <errno.h>
21 #include <time.h>
22 #include <sys/time.h>
23 
24 #include <atomic>
25 
26 #include "foedus/assert_nd.hpp"
29 
30 namespace foedus {
31 namespace soc {
32 
34  uninitialize();
35  waiters_ = 0;
36  notifiers_ = 0;
37  mutex_.initialize();
38  int attr_ret = ::pthread_condattr_init(&attr_);
39  ASSERT_ND(attr_ret == 0);
40 
41  int shared_ret = ::pthread_condattr_setpshared(&attr_, PTHREAD_PROCESS_SHARED);
42  ASSERT_ND(shared_ret == 0);
43 
44  int cond_ret = ::pthread_cond_init(&cond_, &attr_);
45  ASSERT_ND(cond_ret == 0);
46 
47  initialized_ = true;
48 }
49 
51  if (!initialized_) {
52  return;
53  }
54 
56  ASSERT_ND(waiters_ == 0);
57  // we must wait until all notifiers exit notify_all.
58  // this assumes no new notifiers are newly coming in this situation.
59  while (notifiers_ > 0) {
62  }
63 
64  mutex_.uninitialize();
65 
66  int cond_ret = ::pthread_cond_destroy(&cond_);
67  ASSERT_ND(cond_ret == 0);
68 
69  int attr_ret = ::pthread_condattr_destroy(&attr_);
70  ASSERT_ND(attr_ret == 0);
71 
72  initialized_ = false;
73 }
74 
75 
76 void SharedCond::common_assert(SharedMutexScope* scope) {
77  ASSERT_ND(initialized_);
78  ASSERT_ND(scope);
79  ASSERT_ND(scope->is_locked_by_me());
80  ASSERT_ND(scope->get_mutex() == &mutex_);
81 }
82 
83 void ugly_atomic_inc(uint32_t* address) {
84  reinterpret_cast< std::atomic<uint32_t>* >(address)->fetch_add(1U);
85 }
86 void ugly_atomic_dec(uint32_t* address) {
87  reinterpret_cast< std::atomic<uint32_t>* >(address)->fetch_sub(1U);
88 }
89 
91  common_assert(scope);
92 
93  ugly_atomic_inc(&waiters_);
94 
95  // probably pthread_cond_wait implies a full fence, but to make sure.
96  int ret = ::pthread_cond_wait(&cond_, mutex_.get_raw_mutex());
97  ASSERT_ND(ret == 0);
98 
99  ASSERT_ND(waiters_ > 0);
100  ugly_atomic_dec(&waiters_);
101 }
102 
103 bool SharedCond::timedwait(SharedMutexScope* scope, uint64_t timeout_nanosec) {
104  common_assert(scope);
105  struct timespec timeout;
106  struct timeval now;
107  ::gettimeofday(&now, CXX11_NULLPTR);
108  timeout.tv_sec = now.tv_sec + (timeout_nanosec / 1000000000ULL);
109  timeout.tv_nsec = now.tv_usec * 1000ULL + timeout_nanosec % 1000000000ULL;
110  timeout.tv_sec += (timeout.tv_nsec) / 1000000000ULL;
111  timeout.tv_nsec %= 1000000000ULL;
112 
113  ugly_atomic_inc(&waiters_);
114 
115  int ret = ::pthread_cond_timedwait(&cond_, mutex_.get_raw_mutex(), &timeout);
116  ASSERT_ND(ret == 0 || ret == ETIMEDOUT);
117 
118  ASSERT_ND(waiters_ > 0);
119  ugly_atomic_dec(&waiters_);
120  return ret == 0;
121 }
122 
124  common_assert(scope);
125 
126  ugly_atomic_inc(&notifiers_);
127 
128  // to avoid the glibc 2.18 pthread bug in broadcast, we use signal, one by one.
129  scope->unlock();
130  while (waiters_ > 0) {
131  // int ret = ::pthread_cond_broadcast(&cond_);
132  int ret = ::pthread_cond_signal(&cond_);
133  ASSERT_ND(ret == 0);
136  }
137 
138  ugly_atomic_dec(&notifiers_);
139 }
140 
142  ugly_atomic_inc(&notifiers_);
143  // int ret = ::pthread_cond_broadcast(&cond_);
144  int ret = ::pthread_cond_signal(&cond_);
145  ASSERT_ND(ret == 0);
146  ugly_atomic_dec(&notifiers_);
147 }
148 
150  common_assert(scope);
151 
152  ugly_atomic_inc(&notifiers_);
153 
154  scope->unlock();
155  if (waiters_ > 0) {
156  int ret = ::pthread_cond_signal(&cond_);
157  ASSERT_ND(ret == 0);
158  }
159 
160  ugly_atomic_dec(&notifiers_);
161 }
162 
163 } // namespace soc
164 } // namespace foedus
void ugly_atomic_dec(uint32_t *address)
Definition: shared_cond.cpp:86
void wait(SharedMutexScope *scope)
Unconditionally wait for the event.
Definition: shared_cond.cpp:90
#define CXX11_NULLPTR
Used in public headers in place of "nullptr" of C++11.
Definition: cxx11.hpp:132
void initialize(bool recursive=false)
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Definition: assert_nd.hpp:44
pthread_mutex_t * get_raw_mutex()
void signal(SharedMutexScope *scope)
Unblock one waiter.
SharedMutex * get_mutex() const
bool timedwait(SharedMutexScope *scope, uint64_t timeout_nanosec)
Wait for the event up to the given timeout.
void broadcast(SharedMutexScope *scope)
Unblock all waiters.
Auto-lock scope object for SharedMutex.
void spinlock_yield()
Invoke _mm_pause(), x86 PAUSE instruction, or something equivalent in the env.
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
void ugly_atomic_inc(uint32_t *address)
Definition: shared_cond.cpp:83
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
void broadcast_nolock()
Unblock all waiters without a mutex held by the signaller.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Definition: assert_nd.hpp:72
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).