20 #include <glog/logging.h>
43 namespace sequential {
46 : engine_(parent->get_engine()), storage_id_(parent->get_storage_id()) {
67 LOG(INFO) <<
"interesting, super unlucky case. wind immediately";
133 uint32_t allocated_pages,
134 uint64_t* total_pages) {
135 SequentialPage* base =
reinterpret_cast<SequentialPage*
>(snapshot_writer->
get_page_base());
138 SequentialPage* tail_page = base + allocated_pages - 1;
140 ASSERT_ND(tail_page->next_page().snapshot_pointer_ == 0);
145 tail_page->next_page().snapshot_pointer_ = next_head_page_id;
149 *total_pages += allocated_pages;
164 uint32_t allocated_pages = 1;
165 uint64_t total_pages = 0;
182 if (allocated_pages >= max_pages) {
184 CHECK_ERROR(dump_pages(snapshot_writer,
false, allocated_pages, &total_pages));
185 cur_page = compose_new_head(snapshot_writer);
194 cur_page = next_page;
213 CHECK_ERROR(dump_pages(snapshot_writer,
true, allocated_pages, &total_pages));
224 LOG(INFO) <<
to_string() <<
" compose() done in " << stop_watch.
elapsed_ms() <<
"ms. # pages="
225 << total_pages <<
", head_page=" <<
assorted::Hex(head_page_id, 16)
226 <<
", min_epoch=" << min_epoch <<
", max_epoch=" << max_epoch;
234 std::vector<HeadPagePointer> all_head_pages;
255 all_head_pages.push_back(info_page->
pointer_);
257 VLOG(0) <<
to_string() <<
" construct_root() total head page pointers=" << all_head_pages.size();
263 uint32_t allocated_pages = 1;
266 for (uint32_t written_pointers = 0; written_pointers < all_head_pages.size();) {
267 uint16_t count_in_this_page = std::min<uint64_t>(
268 all_head_pages.size() - written_pointers,
270 cur_page->
set_pointers(&all_head_pages[written_pointers], count_in_this_page);
271 written_pointers += count_in_this_page;
272 if (written_pointers < all_head_pages.size()) {
284 ASSERT_ND(written_pointers == all_head_pages.size());
294 storage.
get_control_block()->root_page_pointer_.snapshot_pointer_ = root_of_root_page_id;
295 storage.
get_control_block()->meta_.root_snapshot_page_id_ = root_of_root_page_id;
299 <<
" total head page pointers=" << all_head_pages.size()
301 <<
". root_page_count=" << allocated_pages;
306 return std::string(
"SequentialComposer-") + std::to_string(storage_id_);
316 for (uint16_t node = 0; node < nodes; ++node) {
322 for (uint16_t local_ordinal = 0; local_ordinal < threads_per_node; ++local_ordinal) {
327 if ((*head_ptr) == 0) {
329 VLOG(0) <<
"No volatile pages for thread-" << thread_id <<
" in sequential-" << storage_id_;
344 VLOG(0) <<
"Thread-" << thread_id <<
" in sequential-" << storage_id_ <<
" keeps volatile"
357 VLOG(0) <<
"Thread-" << thread_id <<
" in sequential-" << storage_id_ <<
" dropped all"
358 <<
" volatile pages";
365 DVLOG(2) <<
"Thread-" << thread_id <<
" in sequential-" << storage_id_ <<
" dropped a"
const Page *const * root_info_pages_
Root info pages output by compose()
memory::AlignedMemory work_memory_
Working memory to be used in gleaner's construct_root().
snapshot::LogGleanerResource * gleaner_resource_
All pre-allocated resouces to help run construct_root(), such as memory buffers.
ErrorCode read_page(storage::SnapshotPagePointer page_id, void *out)
Lock-free list of records stored in the volatile part of sequential storage.
Represents a logic to compose a new version of data pages for one storage.
const SequentialAppendLogType * get_entry() const
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
bool can_insert_record(uint16_t payload_length) const
Returns if this page has enough room to insert a record with the given payload length.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
SnapshotPagePointer page_id_
ID of the page that begins the linked list.
Epoch get_first_record_epoch() const
Returns the epoch of the fist record in this page (undefined behavior if no record).
const char * get_buffer() const
Returns the buffer memory.
DualPagePointer & next_page()
double elapsed_ms() const
Epoch valid_until_epoch_
This snapshot contains all the logs until this epoch.
Composer::DropResult drop_volatiles(const Composer::DropVolatilesArguments &args)
virtual ErrorCode wind(uint64_t next_absolute_pos)=0
Loads next data block to be consumed by the caller (composer).
Persistent status part of Transaction ID.
memory::PagePoolOffset get_page_size() const __attribute__((always_inline))
Brings error stacktrace information as return value of functions.
void append_record_nosync(xct::XctId owner_id, uint16_t payload_length, const void *payload)
Appends a record to this page.
Epoch from_epoch_
Inclusive beginning of epochs in the pointed pages.
FileStatus status(const Path &p)
Returns the status of the file.
Represents one input stream of sorted log entries.
const Metadata * get_metadata() const
Returns the metadata of this storage.
Represents one data page in Sequential Storage.
const uint16_t kRootPageMaxHeadPointers
Maximum number of head pointers in one root page.
uint32_t log_streams_count_
Number of sorted runs.
uint64_t cur_absolute_pos_
VolatilePagePointer combine_volatile_page_pointer(uint8_t numa_node, memory::PagePoolOffset offset)
const EngineOptions & get_options() const
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
SnapshotId get_snapshot_id() const
uint64_t get_offset() const
Returns the absolute byte position of the buffer's beginning in the entire file.
uint64_t cur_relative_pos_
Represents an append/scan-only store.
uint64_t get_cur_block_abosulte_begin() const
Current storage block's beginning in absolute byte position in the file.
SnapshotPagePointer get_next_page() const
VolatilePagePointer volatile_pointer_
memory::PagePoolOffset get_offset() const
Output of one compose() call, which are then combined in construct_root().
uint64_t SnapshotPagePointer
Page ID of a snapshot page.
std::string to_string() const
const void * cur_payload_
void set_next_page(SnapshotPagePointer page)
const HeadPagePointer * get_pointers() const
Page * root_info_page_
[OUT] Returns pointers and related information that is required to construct the root page...
uint64_t stop()
Take another current time tick.
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
SnapshotPagePointer snapshot_pointer_
ErrorStack compose(const Composer::ComposeArguments &args)
Epoch get_epoch() const __attribute__((always_inline))
0x0026 : foedus::storage::sequential::SequentialAppendLogType .
uint16_t extract_snapshot_id_from_snapshot_pointer(SnapshotPagePointer pointer)
storage::Page * get_page_base() __attribute__((always_inline))
Retrun value of drop_volatiles()
SnapshotPagePointer * new_root_page_pointer_
[OUT] Returns pointer to new root snapshot page/
uint64_t to_relative_pos(uint64_t absolute_pos) const
uint16_t get_pointer_count() const
Returns How many pointers to head pages exist in this page.
void * get_block() const
Returns the memory block.
uint8_t extract_numa_node_from_snapshot_pointer(SnapshotPagePointer pointer)
uint16_t group_count_
Number of ThreadGroup in the engine.
Represents one stable root page in Sequential Storage.
uint16_t my_partition_
if partitioned_drop_ is true, the partition this thread should drop volatile pages from ...
cache::SnapshotFileSet * previous_snapshot_files_
To read existing snapshots.
ErrorStack construct_root(const Composer::ConstructRootArguments &args)
uint64_t page_count_
In a sequential storage, all pages from the head page is guaranteed to be contiguous (that's how Sequ...
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
void store_max(const Epoch &other)
Kind of std::max(this, other).
double elapsed_us() const
uint64_t end_absolute_pos_
void set_pointers(HeadPagePointer *pointers, uint16_t pointer_count)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
snapshot::SortedBuffer * stream_
thread::ThreadOptions thread_
Declares all log types used in this storage type.
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id)
uint16_t get_numa_node() const
snapshot::SnapshotWriter * snapshot_writer_
Writes out composed pages.
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
uint16_t get_record_count() const
Returns how many records in this page placed so far.
uint64_t get_cur_block_abosulte_end() const
Current storage block's end in absolute byte position in the file.
const ErrorStack kRetOk
Normal return value for no-error case.
bool partitioned_drop_
if true, one thread for each partition will invoke drop_volatiles()
PagePool * get_volatile_pool()
Convenient way of writing hex integers to stream.
ErrorCode init(snapshot::SortedBuffer *stream)
Epoch to_epoch_
Exclusive end of epochs in the pointed pages.
Log type of sequential-storage's append operation.
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
uint32_t root_info_pages_count_
Number of root info pages.
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
uint8_t get_numa_node() const
A high-resolution stop watch.
SequentialComposer(Composer *parent)
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
ErrorCode dump_pages(memory::PagePoolOffset from_page, uint32_t count)
Write out pages that are contiguous in the main page pool.
void initialize_snapshot_page(StorageId storage_id, SnapshotPagePointer page_id)
Called only when this page is initialized.
snapshot::SortedBuffer *const * log_streams_
Sorted runs.
CONTROL_BLOCK * get_control_block() const
snapshot::Snapshot snapshot_
The new snapshot.
void drop(Engine *engine, VolatilePagePointer pointer) const
Returns (might cache) the given pointer to volatile pool.
storage::SnapshotPagePointer get_next_page_id() const
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
ErrorCode
Enum of error codes defined in error_code.xmacro.
Unlike other composers, this one doesn't need merge sort.
Arguments for drop_volatiles()
Writes out one snapshot file for all data pages in one reducer.
uint64_t get_buffer_size() const
Returns the size of buffer memory.
void store_min(const Epoch &other)
Kind of std::min(this, other).
Arguments for construct_root()