20 #include <glog/logging.h>
72 int violation_count = 0;
105 : engine_(engine), meta_() {
114 buffer_ =
reinterpret_cast<char*
>(buffer_memory.
get_block());
138 LOG(INFO) <<
"Thread-" << meta_.
thread_id_ <<
" waiting for space to write logs..";
150 LOG(INFO) <<
"Thread-" << meta_.
thread_id_ <<
" moving head to durable: " << *
this;
155 LOG(WARNING) <<
"Thread-" << meta_.
thread_id_ <<
" logger is getting behind. sleeping "
156 <<
" for a while.." << *
this;
159 std::this_thread::sleep_for(std::chrono::milliseconds(20));
166 void ThreadLogBuffer::fillup_tail() {
173 FillerLogType *filler =
reinterpret_cast<FillerLogType*
>(buffer_ + meta_.
offset_tail_);
174 filler->populate(len);
180 void ThreadLogBuffer::on_new_epoch_observed(Epoch commit_epoch) {
185 VLOG(0) <<
"Thread-" << meta_.
thread_id_ <<
" is writing out the first log entry in epoch-"
188 DVLOG(0) <<
"Before: " << *
this;
193 if (last_mark.offset_end_ != 0) {
194 LOG(INFO) <<
"Interesting. Thread-" << meta_.
thread_id_ <<
"'s last epoch marker was"
195 <<
" already populated by the logger. This thread was probably idle for a while.";
202 LOG(INFO) <<
"Thread-" << meta_.
thread_id_ <<
" has to wait until the logger catches up."
203 <<
" If this often happens, you should increase the number of loggers.";
206 std::this_thread::sleep_for(std::chrono::milliseconds(10));
211 VLOG(0) <<
"Thread-" << meta_.
thread_id_ <<
" still waiting until the logger catches up...";
222 DVLOG(0) <<
"After: " << *
this;
226 void ThreadLogBuffer::crash_stale_commit_epoch(Epoch commit_epoch) {
228 LOG(FATAL) <<
"Received a log-publication request with commit_epoch=" << commit_epoch
229 <<
", which is older than the last epoch=" <<
get_last_epoch() <<
", this is a BUG!"
230 << std::endl <<
" Buffer=" << *
this;
244 }
else if (target.
new_epoch_ == written_epoch) {
253 VLOG(0) <<
"This guy seems sleeping for a while.." << *
this;
266 VLOG(0) <<
"Okay, the logger populated the offset_end on behalf." << *
this;
269 LOG(INFO) <<
"Interesting. The thread has now awaken and added a new mark. offset_end"
270 <<
" should be now populated. "<< *
this;
281 VLOG(0) <<
"Advancing oldest mark index. before=" << *
this;
288 LOG(FATAL) <<
"meta_.oldest_mark_index_ out of range. we must have waited.";
291 VLOG(0) <<
"Advanced oldest mark index. after=" << *
this;
295 VLOG(1) <<
"Heh, this guy is still sleeping.." << *
this;
311 }
else if (target.
new_epoch_ == written_epoch) {
326 DVLOG(0) <<
"Logger has written out all logs in epoch-" << written_epoch <<
". " << *
this;
336 o <<
"<ThreadLogBuffer>";
337 o <<
"<thread_id_>" << v.
thread_id_ <<
"</thread_id_>";
338 o <<
"<buffer_size_>" << v.
buffer_size_ <<
"</buffer_size_>";
339 o <<
"<offset_head_>" << v.
offset_head_ <<
"</offset_head_>";
342 o <<
"<offset_tail_>" << v.
offset_tail_ <<
"</offset_tail_>";
343 o <<
"<thread_epoch_marks"
358 o <<
"</thread_epoch_marks>";
359 o <<
"</ThreadLogBuffer>";
364 o <<
"<ThreadEpockMark "
uint64_t head_to_tail_distance() const __attribute__((always_inline))
void wakeup_loggers()
Wake up loggers if they are sleeping.
NumaCoreMemory * get_core_memory(foedus::thread::ThreadId id) const
A thread-local log buffer.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
Epoch get_initial_current_epoch() const
void wait_for_space(uint16_t required_space)
Called when we have to wait till offset_head_ advances so that we can put new logs.
uint64_t offset_end_
Where the new epoch ends.
Brings error stacktrace information as return value of functions.
Epoch old_epoch_
The value of new_epoch_ of the previous mark.
Epoch new_epoch_
The epoch of log entries this mark represents.
Declares common log types used in all packages.
const EngineOptions & get_options() const
Repository of memories dynamically acquired within one CPU core (thread).
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
Epoch get_last_epoch() const
AlignedMemorySlice get_log_buffer_memory() const
log::LogManager * get_log_manager() const
See Log Manager.
static void advance(uint64_t buffer_size, uint64_t *target, uint64_t amount) __attribute__((always_inline))
Addition operator, considering wrapping around.
OffsetRange get_logs_to_write(Epoch written_epoch)
Returns begin/end offsets of logs in the given epoch.
Database engine object that holds all resources and provides APIs.
uint64_t offset_begin_
Where the new epoch starts.
uint64_t get_size() const
A slice of foedus::memory::AlignedMemory.
A thread-buffer's epoch marker, which indicates where a thread switched an epoch. ...
void assert_consistent() const
Only for Debug-assertion.
std::ostream & operator<<(std::ostream &o, const LogHeader &v)
ErrorStack initialize_once() override
NumaNodeMemory * get_local_memory() const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
foedus::fs::DeviceEmulationOptions emulation_
Settings to emulate slower logging device.
const ErrorStack kRetOk
Normal return value for no-error case.
void on_log_written(Epoch written_epoch)
Called when the logger wrote out all logs in the given epoch, advancing oldest_mark_index_.
Convenient way of writing hex integers to stream.
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
Epoch get_initial_durable_epoch() const
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
bool null_device_
[Experiments] as if we write out to /dev/null.
ErrorStack uninitialize_once() override
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).