20 #include <glog/logging.h>
50 return std::string(
"LogReducer-") + std::to_string(
get_id());
79 snapshot_manager_memory_->gleaner_.cur_snapshot_;
84 const char* send_buffer,
86 uint64_t send_buffer_size)
const {
87 uint32_t real_log_count = 0;
94 while (cur < send_buffer_size) {
123 const char* send_buffer,
125 uint64_t send_buffer_size,
126 uint32_t shortest_key_length,
127 uint32_t longest_key_length) {
128 DVLOG(1) <<
"Appending a block of " << send_buffer_size <<
" bytes (" << log_count
129 <<
" entries) to " <<
to_string() <<
"'s buffer for storage-" << storage_id;
134 const uint64_t required_size = send_buffer_size +
sizeof(
FullBlockHeader);
135 uint32_t buffer_index = 0;
136 uint64_t begin_position = 0;
145 VLOG(0) <<
"Both buffers full in" <<
to_string() <<
". I'll sleep for a while..";
147 std::this_thread::sleep_for(std::chrono::milliseconds(1));
149 VLOG(0) <<
"Buffer switched in" <<
to_string() <<
" after sleep. Let's resume.";
160 == reinterpret_cast<char*>(
buffers_[1]));
164 if (!status_address->compare_exchange_strong(cur_status.
word, new_status.
word)) {
178 if (!status_address->compare_exchange_strong(cur_status.
word, new_status.
word)) {
192 char* destination =
reinterpret_cast<char*
>(buffer) + begin_position;
203 std::memcpy(destination +
sizeof(
FullBlockHeader), send_buffer, send_buffer_size);
205 DVLOG(1) <<
"memcpy of " << send_buffer_size <<
" bytes took "
206 << copy_watch.
elapsed() <<
" cycles";
215 if (!status_address->compare_exchange_strong(cur_status.
word, new_status.
word)) {
233 DVLOG(1) <<
"Completed appending a block of " << send_buffer_size <<
" bytes to " <<
to_string()
234 <<
"'s buffer for storage-" << storage_id <<
" in " << stop_watch.
elapsed() <<
" cycles";
235 ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->is_full_block());
236 ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->storage_id_ == storage_id);
237 ASSERT_ND(reinterpret_cast<FullBlockHeader*>(destination)->block_length_
LogCode
A unique identifier of all log types.
std::atomic< uint64_t > * get_buffer_status_address(uint32_t index)
BufferPosition to_buffer_position(uint64_t byte_position)
LogCodeKind
Represents the kind of log types.
Compactly represents important status informations of a reducer buffer.
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
storage::Page * root_info_pages_
GlobalMemoryAnchors * get_global_memory_anchors()
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
std::atomic< uint32_t > current_buffer_
buffers_[current_buffer_ % 2] is the buffer mappers should append to.
uint32_t get_ordinal() const __attribute__((always_inline))
struct foedus::snapshot::ReducerBufferStatus::Components components
Same as GlobalMemoryAnchors except this is for node_memories_.
A bit-wise flag in ReducerBufferStatus's flags_.
NodeMemoryAnchors * get_node_memory_anchors(SocId node)
Declares common log types used in all packages.
bool is_valid() const __attribute__((always_inline))
const EngineOptions & get_options() const
ReducerBufferStatus get_buffer_status_atomic(uint32_t index) const
const Snapshot & get_cur_snapshot() const
void append_log_chunk(storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size, uint32_t shortest_key_length, uint32_t longest_key_length)
Append the log entries of one storage in the given buffer to this reducer's buffer.
uint64_t from_buffer_position(BufferPosition buffer_position)
storage::StorageId max_storage_id_
Largest storage ID as of starting to take the snapshot.
uint64_t elapsed() const __attribute__((always_inline))
uint32_t get_current_buffer_index_atomic() const
snapshot::SnapshotOptions snapshot_
LogReducerControlBlock * control_block_
SnapshotId id_
Unique ID of this snapshot.
Database engine object that holds all resources and provides APIs.
Epoch get_epoch() const __attribute__((always_inline))
std::string to_string() const
snapshot::LogReducerControlBlock * log_reducer_memory_
Tiny control memory for LogReducer in this node.
Represents one snapshot that converts all logs from base epoch to valid_until epoch into snapshot fil...
void * log_reducer_buffers_[2]
Actual buffers for LogReducer.
const SnapshotId kNullSnapshotId
uint32_t log_reducer_buffer_mb_
The size in MB of a buffer to store log entries in reducer (partition).
storage::Page * log_reducer_root_info_pages_
This is the 'output' of the reducer in this node.
std::atomic< uint32_t > total_storage_count_
Set at the end of merge_sort().
uint64_t get_buffer_size() const
BufferPosition tail_position_
Epoch base_epoch_
This snapshot was taken on top of previous snapshot that is valid_until this epoch.
void * get_buffer(uint32_t index) const
0 is reserved as a non-existing log type.
A RDTSC-based low-overhead stop watch.
soc::SocManager * get_soc_manager() const
See SOC and IPC.
bool verify_log_chunk(storage::StorageId storage_id, const char *send_buffer, uint32_t log_count, uint64_t send_buffer_size) const
used only in debug mode
0x3001 : foedus::log::FillerLogType .
uint64_t stop() __attribute__((always_inline))
Take another current time tick.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint32_t get_total_storage_count() const
uint16_t id_
ID of this reducer (or numa node ID).
SharedMemoryRepo * get_shared_memory_repo()
Returns the shared memories maintained across SOCs.
0x3002 : foedus::log::EpochMarkerLogType .