18 #ifndef FOEDUS_SNAPSHOT_LOG_MAPPER_IMPL_HPP_
19 #define FOEDUS_SNAPSHOT_LOG_MAPPER_IMPL_HPP_
81 return std::string(
"LogMapper-") + std::to_string(
id_);
90 kBucketSize = 1 << 16,
91 kBucketMaxCount = (kBucketSize - 16) / 4,
97 kBucketHashListMaxCount = 1 << 12,
104 kSendBufferSize = 1 << 20,
110 struct Bucket final {
111 inline bool is_full() const
ALWAYS_INLINE {
return counts_ >= kBucketMaxCount; }
118 Bucket* next_bucket_;
132 struct BucketHashList {
134 uint16_t bucket_counts_;
138 BucketHashList* hashlist_next_;
148 struct PartitionSortEntry {
150 partition_ = partition;
151 position_ = position;
155 return partition_ < other.partition_;
158 return partition_ <= other.partition_;
161 return partition_ == other.partition_;
164 return partition_ != other.partition_;
167 return partition_ > other.partition_;
170 return partition_ >= other.partition_;
180 uint64_t size_inbuf_aligned_;
181 uint64_t size_infile_aligned_;
183 uint64_t next_infile_;
184 uint64_t buf_infile_aligned_;
186 uint64_t end_inbuf_aligned_;
187 uint64_t end_infile_;
188 bool more_in_the_file_;
193 uint64_t to_infile(uint64_t inbuf)
const {
return inbuf + buf_infile_aligned_; }
197 memory::AlignedMemory io_buffer_;
200 memory::AlignedMemory buckets_memory_;
211 memory::AlignedMemory tmp_memory_;
214 memory::AlignedMemory presort_buffer_;
216 memory::AlignedMemory presort_ouputs_;
218 memory::AlignedMemory presort_reordered_;
224 memory::AlignedMemorySlice tmp_send_buffer_slice_;
230 memory::AlignedMemorySlice tmp_position_array_slice_;
236 memory::AlignedMemorySlice tmp_sort_array_slice_;
242 memory::AlignedMemorySlice tmp_hashlist_buffer_slice_;
249 memory::AlignedMemorySlice tmp_partition_array_slice_;
252 uint32_t buckets_allocated_count_;
258 uint32_t hashlist_allocated_count_;
261 uint64_t processed_log_count_;
272 BucketHashList* storage_hashlists_[256];
277 ErrorStack handle_process_buffer(
const fs::DirectIoFile &file, IoBufStatus*
status);
306 void flush_all_buckets();
312 void flush_bucket(
const BucketHashList& hashlist);
318 void send_bucket_partition_general(
319 const Bucket* bucket,
323 void send_bucket_partition_presort(
328 void send_bucket_partition_buffer(
329 const Bucket* bucket,
331 const char* send_buffer,
334 uint32_t shortest_key_length,
335 uint32_t longest_key_length);
340 void clear_storage_buckets();
347 inline BucketHashList* find_storage_hashlist(
storage::StorageId storage_id) ALWAYS_INLINE {
348 uint8_t index =
static_cast<uint8_t
>(storage_id);
349 BucketHashList* hashlist = storage_hashlists_[index];
350 if (
UNLIKELY(hashlist ==
nullptr)) {
353 while (
UNLIKELY(hashlist->storage_id_ != storage_id)) {
354 hashlist = hashlist->hashlist_next_;
355 if (hashlist ==
nullptr) {
364 void add_storage_hashlist(BucketHashList* new_hashlist);
366 void report_completion(
double elapsed_sec);
372 #endif // FOEDUS_SNAPSHOT_LOG_MAPPER_IMPL_HPP_
thread::ThreadGroupId PartitionId
As partition=NUMA node, this is just a synonym of foedus::thread::ThreadGroupId.
Forward declarations of classes in filesystem package.
Definitions of IDs in this package and a few related constant values.
Forward declarations of classes in log manager package.
bool operator>(const Path &lhs, const Path &rhs)
LogMapper(Engine *engine, uint16_t local_ordinal)
uint32_t StorageId
Unique ID for storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
bool operator==(const Path &lhs, const Path &rhs)
Typedefs of ID types used in thread package.
uint32_t BufferPosition
Represents a position in some buffer.
Forward declarations of classes in root package.
Brings error stacktrace information as return value of functions.
Forward declarations of classes in snapshot manager package.
FileStatus status(const Path &p)
Returns the status of the file.
ErrorStack uninitialize_once() override
Typedefs of ID types used in log package.
A log mapper, which reads log files from one logger and sends them to corresponding log reducers...
friend std::ostream & operator<<(std::ostream &o, const LogMapper &v)
std::string to_string() const override
Expects "LogReducer-x", "LogMapper-y" etc.
uint32_t LogFileOrdinal
Ordinal of log files (eg "log.0", "log.1").
bool operator!=(const Path &lhs, const Path &rhs)
bool operator<=(const Path &lhs, const Path &rhs)
Database engine object that holds all resources and provides APIs.
bool operator>=(const Path &lhs, const Path &rhs)
bool operator<(const Path &lhs, const Path &rhs)
ErrorStack initialize_once() override
const uint16_t id_
Unique ID of this mapper or reducer.
StorageType
Type of the storage, such as hash.
#define STATIC_SIZE_CHECK(desired, actual)
Base class for LogMapper and LogReducer to share common code.
#define UNLIKELY(x)
Hints that x is highly likely false.
Forward declarations of classes in thread package.
ErrorStack handle_process() override
Implements the specific logics in derived class.
#define ALWAYS_INLINE
A function suffix to hint that the function should always be inlined.