20 #include <glog/logging.h>
47 : engine_(engine), meta_logger_(nullptr), control_block_(nullptr) {}
55 <<
", #NUMA-nodes=" <<
static_cast<int>(
groups_) <<
", #total_threads=" << total_threads;
61 if (total_loggers == 0 || total_loggers % groups_ != 0 || total_threads % total_loggers != 0
62 || total_loggers > total_threads) {
71 const uint16_t cores_per_logger = total_threads / total_loggers;
79 node * loggers_per_node_ + j,
106 std::vector< thread::ThreadId > assigned_thread_ids;
107 for (
auto k = 0; k < cores_per_logger; ++k) {
120 node * loggers_per_node_ + j,
124 assigned_thread_ids);
132 std::vector<std::thread> init_threads;
135 init_threads.push_back(std::thread([logger]() {
139 LOG(INFO) <<
"Launched threads to initialize loggers in node-" << node <<
". waiting..";
140 for (
auto& init_thread : init_threads) {
143 LOG(INFO) <<
"All loggers in node-" << node <<
" were initialized!";
150 LOG(INFO) <<
"Uninitializing LogManager..";
181 Epoch min_durable_epoch;
184 min_durable_epoch.
store_min(logger.get_durable_epoch());
189 VLOG(0) <<
"durable_global_epoch_ not advanced";
194 <<
" to " << min_durable_epoch;
198 VLOG(0) <<
"oh, I lost the race.";
217 DVLOG(1) <<
"Already durable. commit_epoch=" << commit_epoch <<
", durable_global_epoch_="
222 if (wait_microseconds == 0) {
223 DVLOG(1) <<
"Conditional check: commit_epoch=" << commit_epoch <<
", durable_global_epoch_="
228 std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
229 std::chrono::high_resolution_clock::time_point until
230 = now + std::chrono::microseconds(wait_microseconds);
234 logger.wakeup_for_durable_epoch(commit_epoch);
237 VLOG(0) <<
"Synchronously waiting for commit_epoch " << commit_epoch;
238 if (wait_microseconds <= 0) {
249 if (std::chrono::high_resolution_clock::now() >= until) {
250 LOG(WARNING) <<
"Timeout occurs. wait_microseconds=" << wait_microseconds;
282 logger.copy_logger_state(new_savepoint);
std::string convert_folder_path_pattern(int node, int logger) const
converts folder_path_pattern_ into a string with the given IDs.
soc::SharedPolling durable_global_epoch_advanced_
Fired (broadcast) whenever durable_global_epoch_ is advanced.
std::vector< Logger * > loggers_
Local log writers.
std::vector< uint64_t > oldest_log_files_offset_begin_
Indicates the inclusive beginning of active region in the oldest log file.
std::atomic< Epoch::EpochInteger > durable_global_epoch_
The durable epoch of the entire engine.
void set_engine(Engine *engine)
uint8_t ThreadLocalOrdinal
Typedef for a local ID of Thread (core), which is NOT unique across NUMA nodes.
ErrorStack uninitialize_once() override
void emprace_back(ErrorStack &&error_stack)
If the given ErrorStack is an error, this method adds it to the end of this batch.
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
GlobalMemoryAnchors * get_global_memory_anchors()
log::LogManagerControlBlock * log_manager_memory_
Tiny memory for log manager.
Typedefs of ID types used in thread package.
log::LoggerControlBlock ** logger_memories_
Status and synchronization mechanism for loggers on this node.
std::vector< uint64_t > current_log_files_offset_durable_
Indicates the exclusive end of durable region in the current log file.
void wait(uint64_t demanded_ticket, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Unconditionally wait for signal.
0x0005 : "GENERAL: A dependent module is not initialized yet. This implies a wrong initialization ord...
ErrorStack uninitialize() override final
Typical implementation of Initializable::uninitialize() that provides uninitialize-once semantics...
Brings error stacktrace information as return value of functions.
The information we maintain in savepoint manager and externalize to a file.
virtual void attach(CONTROL_BLOCK *control_block)
Attaches to the given shared memory.
Same as GlobalMemoryAnchors except this is for node_memories_.
ErrorStack take_savepoint(Epoch new_global_durable_epoch)
Atomically and durably takes a savepoint for the given epoch advancement.
0x0008 : "GENERAL: Timeout." .
Definitions of IDs in this package and a few related constant values.
NodeMemoryAnchors * get_node_memory_anchors(SocId node)
log::MetaLogControlBlock * meta_logger_memory_
Tiny memory for metadata logger.
Typedefs of ID types used in log package.
LogManagerControlBlock * control_block_
#define CHECK_OUTOFMEMORY(ptr)
This macro checks if ptr is nullptr, and if so exists with kErrorCodeOutofmemory error stack...
const EngineOptions & get_options() const
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
#define COERCE_ERROR(x)
This macro calls x and aborts if encounters an error.
bool is_master() const
Returns if this engine object is a master instance.
savepoint::SavepointManager * get_savepoint_manager() const
See Savepoint Manager.
0x0501 : "LOG : The number of loggers per node must be a submultiple of the number of cores in the...
uint64_t acquire_ticket() const
Gives the ticket to.
Epoch get_durable_global_epoch() const
Batches zero or more ErrorStack objects to represent in one ErrorStack.
A view of Logger object for other SOCs and master engine.
bool is_initialized() const override
Returns whether the object has been already initialized or not.
Analogue of boost::filesystem::path.
MetaLogger * meta_logger_
Metadata log writer.
A log writer that writes out buffered logs to stable storages.
MetaLogBuffer meta_buffer_
Metadata log buffer.
ErrorStack initialize() override final
Typical implementation of Initializable::initialize() that provides initialize-once semantics...
#define SPINLOCK_WHILE(x)
A macro to busy-wait (spinlock) with occasional pause.
Database engine object that holds all resources and provides APIs.
std::vector< LoggerRef > logger_refs_
All log writers.
thread::ThreadGroupId groups_
Auto-lock scope object for SharedMutex.
bool create_directories(const Path &p, bool sync=false)
Recursive mkdir (mkdirs).
bool exists(const Path &p)
Returns if the file exists.
uint16_t group_count_
Number of ThreadGroup in the engine.
Repository of all shared memory in one FOEDUS instance.
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
#define SUMMARIZE_ERROR_BATCH(x)
This macro calls ErrorStackBatch::summarize() with automatically provided parameters.
ErrorStack refresh_global_durable_epoch()
uint16_t SocId
Represents an ID of an SOC, or NUMA node.
thread::ThreadOptions thread_
uint16_t loggers_per_node_
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
ErrorStack initialize_once() override
void uninitialize_and_delete_all(std::vector< T * > *vec)
A convenience method to uninitialize and delete all Initializable objects in a vector, storing all errors in this batch.
const ErrorStack kRetOk
Normal return value for no-error case.
void copy_logger_states(savepoint::Savepoint *new_savepoint)
soc::SocManager * get_soc_manager() const
See SOC and IPC.
std::vector< log::LogFileOrdinal > oldest_log_files_
Ordinal of the oldest active log file in each logger.
Atomic fence methods and load/store with fences that work for both C++11/non-C++11 code...
soc::SharedMutex durable_global_epoch_savepoint_mutex_
To-be-removed Serializes the thread to take savepoint to advance durable_global_epoch_.
ThreadId get_total_thread_count() const
Epoch get_initial_durable_epoch() const
void announce_new_durable_global_epoch(Epoch new_epoch)
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
soc::SocId get_soc_id() const
If this is a child instance, returns its SOC ID (NUMA node).
0x0006 : "GENERAL: A dependent module is already uninitialized. This implies a wrong uninitialization...
ErrorCode wait_until_durable(Epoch commit_epoch, int64_t wait_microseconds)
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
uint16_t LoggerId
Typedef for an ID of Logger.
bool is_initialized() const override
Returns whether the object has been already initialized or not.
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
std::vector< log::LogFileOrdinal > current_log_files_
Indicates the log file each logger is currently appending to.
ErrorCode
Enum of error codes defined in error_code.xmacro.
bool timedwait(uint64_t demanded_ticket, uint64_t timeout_microsec, uint64_t polling_spins=kDefaultPollingSpins, uint64_t max_interval_us=kDefaultPollingMaxIntervalUs) const
Wait for signal up to the given timeout.
thread::ThreadPool * get_thread_pool() const
See Thread and Thread-Group.
void signal()
Signal it to let waiters exit.
uint16_t loggers_per_node_
Number of loggers per NUMA node.
EpochInteger value() const
Returns the raw integer representation.
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
void store_min(const Epoch &other)
Kind of std::min(this, other).