chore: migrate to object_store

Move to the more modern and maintained object_store
library to access generic buckets.

We should be able also to do streaming of files now,
and proceed by implementing paginated results for listings
if we want.

Fixes #1.
This commit is contained in:
Matteo Settenvini 2025-08-03 20:47:12 +02:00
parent 996be0f6df
commit 5d6648ecba
Signed by: matteo
GPG key ID: 1C1B12600D81DE05
5 changed files with 381 additions and 453 deletions

643
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -21,14 +21,14 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
bytes = "1.10"
futures = "0.3"
human-size = "0.4" human-size = "0.4"
lazy_static = "1.4" lazy_static = "1.4"
log = "0.4" log = "0.4"
rocket = "0.5" rocket = "0.5"
rocket_dyn_templates = { version = "0.2.0", features = ["tera"] } rocket_dyn_templates = { version = "0.2.0", features = ["tera"] }
rust-s3 = { version = "0.35", default-features = false, features = [ object_store = { version = "0.12", features = ["aws"] }
"tokio-rustls-tls",
] }
serde = "1.0" serde = "1.0"
tempfile = "3.20" tempfile = "3.20"
@ -38,7 +38,7 @@ futures = "0.3"
libc = "0.2" libc = "0.2"
regex = "1.11" regex = "1.11"
reqwest = "0.12" reqwest = "0.12"
rstest = "0.25" rstest = "0.26"
scraper = "0.23" scraper = "0.23"
test-log = "0.2" test-log = "0.2"
testcontainers = "0.24" testcontainers = "0.24"

View file

@ -6,16 +6,19 @@ mod sizes;
use { use {
anyhow::Result, anyhow::Result,
bytes::Bytes,
futures::{StreamExt, stream::BoxStream},
lazy_static::lazy_static, lazy_static::lazy_static,
object_store::path::Path,
rocket::{ rocket::{
State, Request, State,
fairing::AdHoc, fairing::AdHoc,
figment::{ figment::{
Profile, Profile,
providers::{Env, Format as _, Toml}, providers::{Env, Format as _, Toml},
}, },
http::uri::Origin, http::uri::Origin,
response::{Redirect, Responder}, response::{self, Redirect, Responder, stream::ByteStream},
serde::Serialize, serde::Serialize,
}, },
rocket_dyn_templates::{Template, context}, rocket_dyn_templates::{Template, context},
@ -23,17 +26,22 @@ use {
std::path::PathBuf, std::path::PathBuf,
}; };
#[derive(Responder)]
enum FileView { enum FileView {
#[response(content_type = "text/html")]
Folder(Template), Folder(Template),
File(ByteStream<BoxStream<'static, Bytes>>),
#[response(content_type = "application/octet-stream")]
File(Vec<u8>),
Redirect(Redirect), Redirect(Redirect),
} }
impl<'r> Responder<'r, 'r> for FileView {
fn respond_to(self, req: &'r Request<'_>) -> response::Result<'r> {
match self {
Self::Folder(template) => template.respond_to(req),
Self::File(stream) => stream.respond_to(req),
Self::Redirect(redirect) => redirect.respond_to(req),
}
}
}
#[derive(Serialize)] #[derive(Serialize)]
struct FileViewItem { struct FileViewItem {
path: String, path: String,
@ -68,7 +76,7 @@ async fn index(
We try first to retrieve list an object as a file. If we fail, We try first to retrieve list an object as a file. If we fail,
we fallback to retrieving the equivalent folder. we fallback to retrieving the equivalent folder.
*/ */
if let Ok(result) = s3_serve_file(&path, &state).await { if let Ok(result) = s3_serve_file(path.clone(), &state).await {
Ok(result) Ok(result)
} else { } else {
// We need to redirect to a path ending with a slash as // We need to redirect to a path ending with a slash as
@ -91,28 +99,41 @@ async fn index(
} }
} }
async fn s3_serve_file(path: &PathBuf, settings: &Settings) -> Result<FileView, Error> { async fn s3_serve_file(path: PathBuf, settings: &Settings) -> Result<FileView, Error> {
let is_root_prefix = path.as_os_str().is_empty(); let is_root_prefix = path.as_os_str().is_empty();
if is_root_prefix { if is_root_prefix {
return Err(Error::NotFound("Root prefix is not a file".into())); return Err(Error::UnknownError("root prefix is not a file".into()));
} }
// FIXME: this can be big, we should use streaming, let s3_path = object_store::path::Path::from(path.to_str().unwrap());
// not loading in memory!
let response = settings let object_stream = settings
.s3_bucket .s3_bucket
.get_object(format!("{}", path.display())) .get(&s3_path)
.await .await
.map_err(|_| Error::UnknownError("Unable to connect to S3 bucket".into()))?; .map_err(|e| match e {
object_store::Error::NotFound { path: _, source: _ } => {
Error::UnknownError(e.to_string())
}
_ => Error::UnknownError(e.to_string()),
})?
.into_stream();
match response.status_code() { let stream = object_stream
200 | 204 => { .map(move |chunk| match chunk {
let bytes = response.bytes().to_vec(); Ok(bytes) => bytes,
Ok(FileView::File(bytes)) Err(err) => {
} log::error!("connection error while reading {}: {}", path.display(), err);
404 => Err(Error::NotFound("Object not found".into())), Bytes::new() // Forces end of stream
_ => Err(Error::UnknownError("Unknown S3 error".into())), }
} })
.boxed();
// TODO: unfortunately Rocket does not have a ByteStream with a Result per chunk,
// meaning that if there is a failure in the middle of the stream, the best we can do is...
// nothing? Panic? All options are bad.
Ok(FileView::File(ByteStream::from(stream)))
} }
async fn s3_fileview(path: &PathBuf, settings: &Settings) -> Result<Vec<FileViewItem>, Error> { async fn s3_fileview(path: &PathBuf, settings: &Settings) -> Result<Vec<FileViewItem>, Error> {
@ -122,59 +143,37 @@ async fn s3_fileview(path: &PathBuf, settings: &Settings) -> Result<Vec<FileView
- files will be under the 'contents' property - files will be under the 'contents' property
*/ */
let parent = path.parent(); let s3_folder_path = path.to_str().map(|p| Path::from(p));
let s3_folder_path = match parent {
Some(_) => format!("{}/", path.display()),
None => "".into(),
};
let s3_objects = settings let s3_objects = settings
.s3_bucket .s3_bucket
.list(s3_folder_path, Some("/".into())) .list_with_delimiter(s3_folder_path.as_ref())
.await .await
.map_err(|_| Error::NotFound("Object not found".into()))?; .map_err(|_| Error::NotFound("object not found".into()))?;
let objects = s3_objects let folders = s3_objects.common_prefixes.into_iter().map(|dir| {
.iter() let dirname = dir.parts().last().unwrap();
.flat_map(|list| -> Vec<Option<FileViewItem>> { FileViewItem {
let prefix = if let Some(p) = &list.prefix { path: dirname.as_ref().into(),
p.as_str() size_bytes: 0,
} else { size: "[DIR]".to_owned(),
"" last_modification: String::default(),
}; }
});
let folders = list.common_prefixes.iter().flatten().map(|dir| { let files = s3_objects.objects.into_iter().map(|obj| FileViewItem {
let path = dir.prefix.strip_prefix(&prefix); path: obj.location.filename().unwrap().into(),
path.map(|path| FileViewItem { size_bytes: obj.size,
path: path.to_owned(), size: sizes::bytes_to_human(obj.size),
size_bytes: 0, last_modification: obj.last_modified.to_rfc3339(),
size: "[DIR]".to_owned(), });
last_modification: String::default(),
})
});
let files = list.contents.iter().map(|obj| { Ok(folders.chain(files).collect())
let path = obj.key.strip_prefix(&prefix);
path.map(|path| FileViewItem {
path: path.to_owned(),
size_bytes: obj.size,
size: sizes::bytes_to_human(obj.size),
last_modification: obj.last_modified.clone(),
})
});
folders.chain(files).collect()
})
.flatten()
.collect();
Ok(objects)
} }
lazy_static! { lazy_static! {
// Workaround for https://github.com/SergioBenitez/Rocket/issues/1792 // Workaround for https://github.com/SergioBenitez/Rocket/issues/1792
static ref EMPTY_DIR: tempfile::TempDir = tempfile::tempdir() static ref EMPTY_DIR: tempfile::TempDir = tempfile::tempdir()
.expect("Unable to create an empty temporary folder, is the whole FS read-only?"); .expect("unable to create an empty temporary folder, is the whole FS read-only?");
} }
#[rocket::launch] #[rocket::launch]

View file

@ -1,16 +1,20 @@
// SPDX-FileCopyrightText: © Matteo Settenvini <matteo.settenvini@montecristosoftware.eu> // SPDX-FileCopyrightText: © Matteo Settenvini <matteo.settenvini@montecristosoftware.eu>
// SPDX-License-Identifier: EUPL-1.2 // SPDX-License-Identifier: EUPL-1.2
use {anyhow::anyhow, rocket::serde::Deserialize, serde::de::Error}; use {
object_store::{ObjectStore, aws},
rocket::serde::Deserialize,
serde::de::Error,
};
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]
pub struct Settings { pub struct Settings {
#[serde(deserialize_with = "deserialize_s3_bucket")] #[serde(deserialize_with = "deserialize_s3_bucket")]
pub s3_bucket: Box<s3::Bucket>, pub s3_bucket: Box<dyn ObjectStore>,
} }
fn deserialize_s3_bucket<'de, D>(deserializer: D) -> Result<Box<s3::Bucket>, D::Error> fn deserialize_s3_bucket<'de, D>(deserializer: D) -> Result<Box<dyn ObjectStore>, D::Error>
where where
D: serde::Deserializer<'de>, D: serde::Deserializer<'de>,
{ {
@ -31,37 +35,26 @@ pub struct S3Config {
pub secret_access_key: String, pub secret_access_key: String,
} }
impl TryInto<Box<s3::Bucket>> for S3Config { impl TryInto<Box<dyn ObjectStore>> for S3Config {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_into(self) -> Result<Box<s3::Bucket>, Self::Error> { fn try_into(self) -> Result<Box<dyn ObjectStore>, Self::Error> {
let region = s3::Region::Custom { // TODO: support object stores other than than AWS
region: self.region, let object_store = aws::AmazonS3Builder::new()
endpoint: self.endpoint, .with_region(self.region)
}; .with_endpoint(&self.endpoint)
.with_bucket_name(&self.name)
let credentials = s3::creds::Credentials::new( .with_access_key_id(self.access_key_id)
Some(&self.access_key_id), .with_secret_access_key(self.secret_access_key)
Some(&self.secret_access_key), .with_virtual_hosted_style_request(!self.path_style)
None, .build()?;
None,
None,
)?;
log::info!( log::info!(
"Serving contents from bucket {} at {}", "Serving contents from bucket {} at {}",
&self.name, self.endpoint,
region.endpoint() self.name,
); );
let bucket = s3::Bucket::new(&self.name, region, credentials).map_err(|e| anyhow!(e)); Ok(Box::new(object_store))
if self.path_style {
bucket.map(|mut b| {
b.set_path_style();
b
})
} else {
bucket
}
} }
} }

View file

@ -5,6 +5,7 @@ mod minio;
use { use {
anyhow::{Result, anyhow}, anyhow::{Result, anyhow},
object_store::ObjectStore,
reqwest::Url, reqwest::Url,
std::{ptr::null_mut, str::FromStr}, std::{ptr::null_mut, str::FromStr},
testcontainers::{ContainerAsync, runners::AsyncRunner}, testcontainers::{ContainerAsync, runners::AsyncRunner},
@ -13,7 +14,7 @@ use {
pub struct Test { pub struct Test {
pub base_url: Url, pub base_url: Url,
pub bucket: Box<s3::Bucket>, pub bucket: Box<dyn ObjectStore>,
pub serves3: tokio::process::Child, pub serves3: tokio::process::Child,
pub _minio: ContainerAsync<minio::MinIO>, pub _minio: ContainerAsync<minio::MinIO>,
} }