20 #include <glog/logging.h>
52 namespace sequential {
59 uint16_t payload_count) {
87 LOG(INFO) <<
"Uninitializing an sequential-storage " <<
get_name();
91 for (uint16_t node = 0; node < nodes; ++node) {
97 for (uint16_t local_ordinal = 0; local_ordinal < threads_per_node; ++local_ordinal) {
102 cur_pointer.
word = page->header().page_id_;
104 cur_pointer.get_offset())));
105 ASSERT_ND(node == cur_pointer.get_numa_node());
111 chunk.
push_back(cur_pointer.get_offset());
113 if (!next_pointer.is_null()) {
114 ASSERT_ND(node == next_pointer.get_numa_node());
116 next_pointer.get_offset()));
122 if (chunk.
size() > 0) {
129 for (uint16_t p = 0; p * 4 < nodes; ++p) {
130 uint16_t node = p * 4;
149 LOG(ERROR) <<
"This sequential-storage already exists: " <<
get_name();
158 control_block_->cur_truncate_epoch_tid_.xct_id_.set_epoch(initial_truncate_epoch);
163 LOG(INFO) <<
"Newly created a sequential-storage " <<
get_name();
171 if (!initial_truncate_epoch.
is_valid()) {
178 control_block_->cur_truncate_epoch_tid_.xct_id_.set_epoch(initial_truncate_epoch);
185 LOG(INFO) <<
"Loaded a sequential-storage " <<
get_name();
195 for (uint16_t p = 0; p * 4 < nodes; ++p) {
196 uint16_t node = p * 4;
225 observed = address->xct_id_;
232 const_cast< xct::RwLockableXctId* >(address)));
237 LOG(INFO) <<
"Truncating " <<
get_name() <<
" upto Epoch " << new_truncate_epoch
239 if (!new_truncate_epoch.
is_valid()) {
240 LOG(ERROR) <<
"truncate() was called with an invalid epoch";
242 }
else if (new_truncate_epoch < engine_->get_earliest_epoch()) {
243 LOG(ERROR) <<
"too-old epoch for this system. " << new_truncate_epoch;
250 <<
". Requested = " << new_truncate_epoch;
255 LOG(WARNING) <<
"Ohh? we don't prohibit it, but are you sure? Truncating up to a future"
267 <<
". Requested = " << new_truncate_epoch;
278 control_block_->cur_truncate_epoch_tid_.xct_id_.set_being_written();
284 std::memset(log_buffer, 0,
sizeof(log_buffer));
296 xct_id.
set(commit_epoch->
value(), 1);
309 LOG(INFO) <<
"Truncated";
319 LOG(INFO) <<
"Applied redo-log of truncation on sequential storage- " <<
get_name()
327 uint16_t payload_count) {
336 if (*tail_pointer != 0) {
340 if (tail ==
nullptr ||
347 if (
UNLIKELY(new_page_offset == 0)) {
348 LOG(FATAL) <<
" Unexpected error. we ran out of free page while inserting to sequential"
349 " storage after commit.";
352 new_page_pointer.
set(node, new_page_offset);
357 if (tail ==
nullptr) {
362 *head_pointer = new_page_offset;
363 *tail_pointer = new_page_offset;
366 *tail_pointer = new_page_offset;
Epoch new_truncate_epoch_
MetaLogBuffer * get_meta_buffer()
Metadata meta_
common part of the metadata.
xct::Xct & get_current_xct()
Returns the transaction that is currently running on this thread.
0x0002 : "GENERAL: Invalid parameter given" .
ErrorCode append_record(thread::Thread *context, const void *payload, uint16_t payload_count)
Append one record to this sequential storage.
memory::NumaCoreMemory * get_thread_memory() const
Returns the private memory repository of this thread.
void append_record(thread::Thread *context, xct::XctId owner_id, const void *payload, uint16_t payload_count)
Appends an already-commited record to this volatile list.
PagePoolOffset grab_free_volatile_page()
Acquires one free volatile page from local page pool.
Lock-free list of records stored in the volatile part of sequential storage.
ErrorStack load(const StorageControlBlock &snapshot_block)
void populate(StorageId storage_id, const void *payload, uint16_t payload_count) __attribute__((always_inline))
#define ERROR_STACK(e)
Instantiates ErrorStack with the given foedus::error_code, creating an error stack with the current f...
Page pool for volatile read/write store (VolatilePage) and the read-only bufferpool (SnapshotPage)...
std::atomic< Epoch::EpochInteger > cur_truncate_epoch_
The min epoch value (truncate-epoch) for all valid records in this storage.
Root package of FOEDUS (Fast Optimistic Engine for Data Unification Services).
bool can_insert_record(uint16_t payload_length) const
Returns if this page has enough room to insert a record with the given payload length.
ErrorCode grab_one(PagePoolOffset *offset)
Grab only one page.
Represents one thread running on one NUMA core.
uint32_t PagePoolOffset
Offset in PagePool that compactly represents the page address (unlike 8 bytes pointer).
Epoch get_first_record_epoch() const
Returns the epoch of the fist record in this page (undefined behavior if no record).
DualPagePointer & next_page()
const GlobalVolatilePageResolver & get_global_volatile_page_resolver() const
Returns the page resolver to convert volatile page ID to page pointer.
ErrorStack initialize_head_tail_pages()
ThreadId get_thread_id() const
ThreadLocalOrdinal decompose_numa_local_ordinal(ThreadId global_id)
Extracts local ordinal from the given globally unique ID of Thread (core).
xct::RwLockableXctId cur_truncate_epoch_tid_
Protects accesses to cur_truncate_epoch_.
void release_one(PagePoolOffset offset)
Returns only one page.
Represents a pointer to a volatile page with modification count for preventing ABA.
Represents a user transaction.
Persistent status part of Transaction ID.
SequentialPage * get_tail(const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
Brings error stacktrace information as return value of functions.
void append_record_nosync(xct::XctId owner_id, uint16_t payload_length, const void *payload)
Appends a record to this page.
bool is_active() const
Returns whether the object is an active transaction.
Engine * engine_
Most attachable object stores an engine pointer (local engine), so we define it here.
The storage has been created and ready for use.
Definitions of IDs in this package and a few related constant values.
Represents one data page in Sequential Storage.
0x0808 : "STORAGE: Payload of the record is too long" .
ErrorCode add_to_lock_free_read_set(storage::StorageId storage_id, XctId observed_owner_id, RwLockableXctId *owner_id_address)
Add the given record to the special read-set that is not placed in usual data pages.
const EngineOptions & get_options() const
ThreadLocalOrdinal thread_count_per_group_
Number of Thread in each ThreadGroup.
A view of NumaNodeMemory for other SOCs and master engine.
void set(Epoch::EpochInteger epoch_int, uint32_t ordinal)
void get_pointer_page_and_index(uint16_t thread_id, uint16_t *page, uint16_t *index)
Calculate the page/index of the thread-private head/tail pointer.
log::ThreadLogBuffer & get_thread_log_buffer()
Returns the private log buffer for this thread.
VolatilePagePointer volatile_pointer_
ErrorStack truncate(Epoch new_truncate_epoch, Epoch *commit_epoch)
ErrorCode optimistic_read_truncate_epoch(thread::Thread *context, Epoch *out) const
SequentialStorageControlBlock * control_block_
The shared data on shared memory that has been initialized in some SOC or master engine.
log::LogManager * get_log_manager() const
See Log Manager.
ErrorCode add_to_lock_free_write_set(storage::StorageId storage_id, log::RecordLogType *log_entry)
Add the given log to the lock-free write set of this transaction.
Epoch get_current_global_epoch() const
Returns the current global epoch, the epoch a newly started transaction will be in.
0x0802 : "STORAGE: This storage already exists" .
void apply_truncate(const SequentialTruncateLogType &the_log)
NumaNodeMemoryRef * get_node_memory(foedus::thread::ThreadGroupId group) const
Epoch get_epoch() const __attribute__((always_inline))
void set(uint8_t numa_node, memory::PagePoolOffset offset)
ThreadGroupId decompose_numa_node(ThreadId global_id)
Extracts NUMA node ID from the given globally unique ID of Thread (core).
StorageId get_id() const
Returns the unique ID of this storage.
Definitions of IDs in this package and a few related constant values.
uint16_t group_count_
Number of ThreadGroup in the engine.
const memory::LocalPageResolver & get_local_volatile_page_resolver() const
Returns page resolver to convert only local page ID to page pointer.
void release(uint32_t desired_release_count, PagePoolOffsetChunk *chunk)
Returns the specified number of free pages from the chunk.
static uint16_t calculate_log_length(uint16_t payload_count) __attribute__((always_inline))
memory::PagePoolOffset * get_head_pointer(thread::ThreadId thread_id) const
ThreadId compose_thread_id(ThreadGroupId node, ThreadLocalOrdinal local_core)
Returns a globally unique ID of Thread (core) for the given node and ordinal in the node...
void push_back(PagePoolOffset pointer)
#define CHECK_ERROR_CODE(x)
This macro calls x and checks its returned error code.
thread::ThreadOptions thread_
Declares all log types used in this storage type.
To reduce the overhead of grabbing/releasing pages from pool, we pack this many pointers for each gra...
#define CHECK_ERROR(x)
This macro calls x and checks its returned value.
SequentialPage * get_head(const memory::LocalPageResolver &resolver, thread::ThreadId thread_id) const
uint16_t ThreadId
Typedef for a global ID of Thread (core), which is unique across NUMA nodes.
uint16_t get_record_count() const
Returns how many records in this page placed so far.
const ErrorStack kRetOk
Normal return value for no-error case.
Log type of TRUNCATE SEQUENTIAL STORAGE operation.
ErrorStack create(const SequentialMetadata &metadata)
ThreadGroupId get_numa_node() const
void apply_append_record(thread::Thread *context, const SequentialAppendLogType *log_entry)
Used to apply the effect of appending to volatile list.
PagePool * get_volatile_pool()
bool is_being_written() const __attribute__((always_inline))
const uint16_t kMaxPayload
Payload must be shorter than this length.
void memory_fence_acquire()
Equivalent to std::atomic_thread_fence(std::memory_order_acquire).
Log type of sequential-storage's append operation.
const LocalPageResolver & get_resolver() const
Gives an object to resolve an offset in this page pool (thus local) to an actual pointer and vice ver...
0x0A04 : "XCTION : This thread is not running any transaction." .
#define UNLIKELY(x)
Hints that x is highly likely false.
#define ASSERT_ND(x)
A warning-free wrapper macro of assert() that has no performance effect in release mode even when 'x'...
Epoch get_earliest_epoch() const
Returns the Earliest-Epoch, the minimum epoch that is valid within this engine.
#define WRAP_ERROR_CODE(x)
Same as CHECK_ERROR(x) except it receives only an error code, thus more efficient.
void memory_fence_release()
Equivalent to std::atomic_thread_fence(std::memory_order_release).
A base layout of shared data for all storage types.
uint8_t ThreadGroupId
Typedef for an ID of ThreadGroup (NUMA node).
char * reserve_new_log(uint16_t log_length) __attribute__((always_inline))
Reserves a space for a new (uncommitted) log entry at the tail.
memory::EngineMemory * get_memory_manager() const
See Memory Manager.
const uint16_t kPageSize
A constant defining the page size (in bytes) of both snapshot pages and volatile pages.
ErrorCode
Enum of error codes defined in error_code.xmacro.
memory::PagePoolOffset * get_tail_pointer(thread::ThreadId thread_id) const
SequentialMetadata meta_
metadata of this storage.
memory::PagePoolOffset pointers_[kPointersPerPage]
const StorageName & get_name() const
EpochInteger value() const
Returns the raw integer representation.
void memory_fence_acq_rel()
Equivalent to std::atomic_thread_fence(std::memory_order_acq_rel).
void initialize_volatile_page(StorageId storage_id, VolatilePagePointer page_id)
Called only when this page is initialized.