Skip to content

Instantly share code, notes, and snippets.

@MrCroxx
Created July 9, 2025 07:53
Show Gist options
  • Select an option

  • Save MrCroxx/8fb65b25f288c6dca8e89121ebcef860 to your computer and use it in GitHub Desktop.

Select an option

Save MrCroxx/8fb65b25f288c6dca8e89121ebcef860 to your computer and use it in GitHub Desktop.
// Copyright 2025 foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
fs::File,
future::Future,
mem::ManuallyDrop,
os::{
fd::{FromRawFd, RawFd},
unix::fs::FileExt,
},
pin::Pin,
sync::{mpsc, Arc},
task::{Context, Poll},
};
use io_uring::{opcode, squeue::Entry as Sqe, types::Fd, IoUring};
use tokio::sync::oneshot;
use crate::storage::noop;
pub struct RawIoBuffer {
ptr: *mut u8,
len: usize,
}
impl RawIoBuffer {
fn into_raw_parts(self) -> (*mut u8, usize) {
(self.ptr, self.len)
}
unsafe fn from_raw_parts(ptr: *mut u8, len: usize) -> Self {
Self { ptr, len }
}
}
#[derive(Debug, thiserror::Error)]
pub enum IoError {
#[error("I/O operation failed: {0}")]
Io(#[from] std::io::Error),
}
impl IoError {
fn from_raw_os_error(raw: i32) -> Self {
Self::Io(std::io::Error::from_raw_os_error(raw))
}
}
pub type IoResult<T> = std::result::Result<T, IoError>;
pub type RegionId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IoType {
Read,
Write,
}
pub struct IoRequest {
pub io_type: IoType,
pub buf: RawIoBuffer,
pub device: Arc<dyn Device>,
pub region: RegionId,
pub offset: u64,
}
pub struct IoResponse {
pub io_type: IoType,
pub buf: RawIoBuffer,
pub device: Arc<dyn Device>,
pub region: RegionId,
pub offset: u64,
}
pub type IoNotifier = oneshot::Sender<IoResult<IoResponse>>;
pub type IoHandle = oneshot::Receiver<IoResult<IoResponse>>;
pub trait IoEngine {
fn submit(&self, requests: IoRequest) -> IoHandle;
}
pub trait Device: Send + Sync + 'static {
fn region_size(&self) -> usize;
fn raw_fd(&self, region: RegionId, offset: u64) -> RawFd;
}
pub struct PsyncIoEngine {}
impl IoEngine for PsyncIoEngine {
fn submit(&self, request: IoRequest) -> IoHandle {
let (tx, rx) = oneshot::channel();
tokio::task::spawn_blocking(move || {
let fd = request.device.raw_fd(request.region, request.offset);
let file = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) });
let res = match request.io_type {
IoType::Read => file.read_exact_at(buf, offset),
IoType::Write => file.write_all_at(buf, offset),
};
let res = res
.map(|_| IoResponse {
io_type: request.io_type,
buf: request.buf,
device: request.device,
region: request.region,
offset: request.offset,
})
.map_err(IoError::from);
let _ = tx.send(res);
});
rx
}
}
struct UringIoCtx {
notifier: IoNotifier,
io_type: IoType,
buf_ptr: *mut u8,
buf_len: usize,
device: Arc<dyn Device>,
region: RegionId,
offset: u64,
}
struct UringIoEngineShard {
rx: mpsc::Receiver<(IoRequest, IoNotifier)>,
ring: IoUring,
io_depth: usize,
inflight: usize,
}
impl UringIoEngineShard {
fn prepare(&mut self, request: IoRequest) -> Sqe {
let fd = Fd(request.device.raw_fd(request.region, request.offset));
let (buf, len) = request.buf.into_raw_parts();
match request.io_type {
IoType::Read => opcode::Read::new(fd, buf, len as _).build(),
IoType::Write => opcode::Write::new(fd, buf, len as _).build(),
}
}
fn notify(&mut self) {}
fn run(mut self) {
loop {
'recv: while self.inflight < self.io_depth {
let (request, notifier) = match self.rx.try_recv() {
Ok((request, notifier)) => (request, notifier),
Err(mpsc::TryRecvError::Empty) => break 'recv,
Err(mpsc::TryRecvError::Disconnected) => return,
};
self.inflight += 1;
let ctx = Box::new(UringIoCtx {
notifier,
io_type: request.io_type,
buf_ptr: request.buf.ptr,
buf_len: request.buf.len,
device: request.device.clone(),
region: request.region,
offset: request.offset,
});
let data = Box::into_raw(ctx) as u64;
let sqe = self.prepare(request).user_data(data);
unsafe { self.ring.submission().push(&sqe).unwrap() }
}
if self.inflight > 0 {
self.ring.submit_and_wait(1).unwrap();
}
for cqe in self.ring.completion() {
let data = cqe.user_data();
let ctx = unsafe { Box::from_raw(data as *mut UringIoCtx) };
let buf = unsafe { RawIoBuffer::from_raw_parts(ctx.buf_ptr, ctx.buf_len) };
let response = IoResponse {
io_type: ctx.io_type,
buf,
device: ctx.device.clone(),
region: ctx.region,
offset: ctx.offset,
};
let res = cqe.result();
if res < 0 {
let err = IoError::from_raw_os_error(res);
let _ = ctx.notifier.send(Err(err));
} else {
let _ = ctx.notifier.send(Ok(response));
}
self.inflight -= 1;
}
}
}
}
pub struct UringIoEngine {
txs: Vec<mpsc::SyncSender<(IoRequest, IoNotifier)>>,
}
impl IoEngine for UringIoEngine {
fn submit(&self, request: IoRequest) -> IoHandle {
let (tx, rx) = oneshot::channel();
let shard = &self.txs[request.region as usize % self.txs.len()];
let _ = shard.send((request, tx));
rx
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_io() {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment