chore: migrate to object_store
Move to the more modern and maintained object_store library to access generic buckets. We should be able also to do streaming of files now, and proceed by implementing paginated results for listings if we want. Fixes #1.
This commit is contained in:
parent
996be0f6df
commit
32e2a5ea4a
11 changed files with 765 additions and 558 deletions
201
src/main.rs
201
src/main.rs
|
@ -6,16 +6,19 @@ mod sizes;
|
|||
|
||||
use {
|
||||
anyhow::Result,
|
||||
bytes::Bytes,
|
||||
futures::{StreamExt, stream::BoxStream},
|
||||
lazy_static::lazy_static,
|
||||
object_store::path::Path as ObjectStorePath,
|
||||
rocket::{
|
||||
State,
|
||||
Request, State,
|
||||
fairing::AdHoc,
|
||||
figment::{
|
||||
Profile,
|
||||
providers::{Env, Format as _, Toml},
|
||||
},
|
||||
http::uri::Origin,
|
||||
response::{Redirect, Responder},
|
||||
http::ContentType,
|
||||
response::{self, Responder, stream::ByteStream},
|
||||
serde::Serialize,
|
||||
},
|
||||
rocket_dyn_templates::{Template, context},
|
||||
|
@ -23,15 +26,21 @@ use {
|
|||
std::path::PathBuf,
|
||||
};
|
||||
|
||||
#[derive(Responder)]
|
||||
enum FileView {
|
||||
#[response(content_type = "text/html")]
|
||||
Folder(Template),
|
||||
File(ByteStream<BoxStream<'static, Bytes>>),
|
||||
}
|
||||
|
||||
#[response(content_type = "application/octet-stream")]
|
||||
File(Vec<u8>),
|
||||
|
||||
Redirect(Redirect),
|
||||
impl<'r> Responder<'r, 'r> for FileView {
|
||||
fn respond_to(self, req: &'r Request<'_>) -> response::Result<'r> {
|
||||
match self {
|
||||
Self::Folder(template) => template.respond_to(req).map(|mut r| {
|
||||
r.set_header(ContentType::HTML);
|
||||
r
|
||||
}),
|
||||
Self::File(stream) => stream.respond_to(req),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
@ -47,20 +56,33 @@ enum Error {
|
|||
#[response(status = 404)]
|
||||
NotFound(String),
|
||||
|
||||
#[response(status = 400)]
|
||||
InvalidRequest(String),
|
||||
|
||||
#[response(status = 500)]
|
||||
UnknownError(String),
|
||||
}
|
||||
|
||||
#[rocket::get("/")]
|
||||
async fn index_root(state: &State<Settings>) -> Result<FileView, Error> {
|
||||
index(None, state).await
|
||||
}
|
||||
|
||||
#[rocket::get("/<path..>")]
|
||||
async fn index(
|
||||
path: PathBuf,
|
||||
uri: &Origin<'_>,
|
||||
state: &State<Settings>,
|
||||
) -> Result<FileView, Error> {
|
||||
async fn index(path: Option<PathBuf>, state: &State<Settings>) -> Result<FileView, Error> {
|
||||
let object_path = if let Some(url_path) = path.as_ref() {
|
||||
let s = url_path.to_str().ok_or(Error::InvalidRequest(
|
||||
"Path cannot be converted to UTF-8".into(),
|
||||
))?;
|
||||
|
||||
Some(ObjectStorePath::from(s))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
/*
|
||||
The way things work in S3, the following holds for us:
|
||||
- we need to use a slash as separator
|
||||
- folders need to be queried ending with a slash
|
||||
- getting the bucket address (empty prefix) will
|
||||
return an XML file with all properties; we don't
|
||||
want that.
|
||||
|
@ -68,113 +90,108 @@ async fn index(
|
|||
We try first to retrieve list an object as a file. If we fail,
|
||||
we fallback to retrieving the equivalent folder.
|
||||
*/
|
||||
if let Ok(result) = s3_serve_file(&path, &state).await {
|
||||
Ok(result)
|
||||
} else {
|
||||
// We need to redirect to a path ending with a slash as
|
||||
// per comment above if we know this is not a file.
|
||||
let mut uri = uri.to_string();
|
||||
if !uri.ends_with('/') {
|
||||
uri.push('/');
|
||||
return Ok(FileView::Redirect(Redirect::permanent(uri)));
|
||||
}
|
||||
|
||||
let objects = s3_fileview(&path, &state).await?;
|
||||
if let Some(path) = &object_path
|
||||
&& object_exists(path, &state).await?
|
||||
{
|
||||
serve_object(&path, &state).await
|
||||
} else {
|
||||
let objects = file_view(object_path, &state).await?;
|
||||
|
||||
let rendered = Template::render(
|
||||
"index",
|
||||
context! {
|
||||
path: format!("{}/", path.display()),
|
||||
path: format!("{}/", path.unwrap_or("".into()).display()),
|
||||
objects
|
||||
},
|
||||
);
|
||||
|
||||
Ok(FileView::Folder(rendered))
|
||||
}
|
||||
}
|
||||
|
||||
async fn s3_serve_file(path: &PathBuf, settings: &Settings) -> Result<FileView, Error> {
|
||||
let is_root_prefix = path.as_os_str().is_empty();
|
||||
if is_root_prefix {
|
||||
return Err(Error::NotFound("Root prefix is not a file".into()));
|
||||
}
|
||||
|
||||
// FIXME: this can be big, we should use streaming,
|
||||
// not loading in memory!
|
||||
let response = settings
|
||||
.s3_bucket
|
||||
.get_object(format!("{}", path.display()))
|
||||
.await
|
||||
.map_err(|_| Error::UnknownError("Unable to connect to S3 bucket".into()))?;
|
||||
|
||||
match response.status_code() {
|
||||
200 | 204 => {
|
||||
let bytes = response.bytes().to_vec();
|
||||
Ok(FileView::File(bytes))
|
||||
}
|
||||
404 => Err(Error::NotFound("Object not found".into())),
|
||||
_ => Err(Error::UnknownError("Unknown S3 error".into())),
|
||||
async fn object_exists(s3_path: &ObjectStorePath, settings: &Settings) -> Result<bool, Error> {
|
||||
match settings.s3_bucket.head(s3_path).await {
|
||||
Ok(_metadata) => Ok(true),
|
||||
Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
|
||||
Err(e) => Err(Error::UnknownError(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn s3_fileview(path: &PathBuf, settings: &Settings) -> Result<Vec<FileViewItem>, Error> {
|
||||
async fn serve_object(s3_path: &ObjectStorePath, settings: &Settings) -> Result<FileView, Error> {
|
||||
let object_stream = settings
|
||||
.s3_bucket
|
||||
.get(&s3_path)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
object_store::Error::NotFound { path: _, source: _ } => Error::NotFound(e.to_string()),
|
||||
_ => Error::UnknownError(e.to_string()),
|
||||
})?
|
||||
.into_stream();
|
||||
|
||||
let s3_path = s3_path.clone();
|
||||
let stream = object_stream
|
||||
.map(move |chunk| match chunk {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => {
|
||||
log::error!("connection error while reading {}: {}", s3_path, err);
|
||||
Bytes::new() // Forces end of stream
|
||||
}
|
||||
})
|
||||
.boxed();
|
||||
|
||||
// TODO: unfortunately Rocket does not have a ByteStream with a Result per chunk,
|
||||
// meaning that if there is a failure in the middle of the stream, the best we can do is...
|
||||
// nothing? Panic? All options are bad.
|
||||
|
||||
Ok(FileView::File(ByteStream::from(stream)))
|
||||
}
|
||||
|
||||
async fn file_view(
|
||||
s3_folder_path: Option<ObjectStorePath>,
|
||||
settings: &Settings,
|
||||
) -> Result<Vec<FileViewItem>, Error> {
|
||||
/*
|
||||
if listing a folder:
|
||||
- folders will be under 'common_prefixes'
|
||||
- files will be under the 'contents' property
|
||||
*/
|
||||
|
||||
let parent = path.parent();
|
||||
let s3_folder_path = match parent {
|
||||
Some(_) => format!("{}/", path.display()),
|
||||
None => "".into(),
|
||||
};
|
||||
|
||||
let s3_objects = settings
|
||||
.s3_bucket
|
||||
.list(s3_folder_path, Some("/".into()))
|
||||
.list_with_delimiter(s3_folder_path.as_ref())
|
||||
.await
|
||||
.map_err(|_| Error::NotFound("Object not found".into()))?;
|
||||
.map_err(|err| match err {
|
||||
object_store::Error::NotFound { path: _, source: _ } => {
|
||||
Error::NotFound("object not found".into())
|
||||
}
|
||||
err => Error::UnknownError(err.to_string()),
|
||||
})?;
|
||||
|
||||
let objects = s3_objects
|
||||
.iter()
|
||||
.flat_map(|list| -> Vec<Option<FileViewItem>> {
|
||||
let prefix = if let Some(p) = &list.prefix {
|
||||
p.as_str()
|
||||
} else {
|
||||
""
|
||||
};
|
||||
let folders = s3_objects.common_prefixes.into_iter().map(|dir| {
|
||||
let dirname = dir.parts().last().unwrap();
|
||||
FileViewItem {
|
||||
path: dirname.as_ref().into(),
|
||||
size_bytes: 0,
|
||||
size: "[DIR]".to_owned(),
|
||||
last_modification: String::default(),
|
||||
}
|
||||
});
|
||||
|
||||
let folders = list.common_prefixes.iter().flatten().map(|dir| {
|
||||
let path = dir.prefix.strip_prefix(&prefix);
|
||||
path.map(|path| FileViewItem {
|
||||
path: path.to_owned(),
|
||||
size_bytes: 0,
|
||||
size: "[DIR]".to_owned(),
|
||||
last_modification: String::default(),
|
||||
})
|
||||
});
|
||||
let files = s3_objects.objects.into_iter().map(|obj| FileViewItem {
|
||||
path: obj.location.filename().unwrap().into(),
|
||||
size_bytes: obj.size,
|
||||
size: sizes::bytes_to_human(obj.size),
|
||||
last_modification: obj.last_modified.to_rfc3339(),
|
||||
});
|
||||
|
||||
let files = list.contents.iter().map(|obj| {
|
||||
let path = obj.key.strip_prefix(&prefix);
|
||||
path.map(|path| FileViewItem {
|
||||
path: path.to_owned(),
|
||||
size_bytes: obj.size,
|
||||
size: sizes::bytes_to_human(obj.size),
|
||||
last_modification: obj.last_modified.clone(),
|
||||
})
|
||||
});
|
||||
|
||||
folders.chain(files).collect()
|
||||
})
|
||||
.flatten()
|
||||
.collect();
|
||||
|
||||
Ok(objects)
|
||||
Ok(folders.chain(files).collect())
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
// Workaround for https://github.com/SergioBenitez/Rocket/issues/1792
|
||||
static ref EMPTY_DIR: tempfile::TempDir = tempfile::tempdir()
|
||||
.expect("Unable to create an empty temporary folder, is the whole FS read-only?");
|
||||
.expect("unable to create an empty temporary folder, is the whole FS read-only?");
|
||||
}
|
||||
|
||||
#[rocket::launch]
|
||||
|
@ -186,7 +203,7 @@ fn rocket() -> _ {
|
|||
.select(Profile::from_env_or("SERVES3_PROFILE", "default"));
|
||||
|
||||
rocket::custom(config_figment)
|
||||
.mount("/", rocket::routes![index])
|
||||
.mount("/", rocket::routes![index_root, index])
|
||||
.attach(AdHoc::config::<Settings>())
|
||||
.attach(Template::custom(|engines| {
|
||||
engines
|
||||
|
|
|
@ -1,16 +1,20 @@
|
|||
// SPDX-FileCopyrightText: © Matteo Settenvini <matteo.settenvini@montecristosoftware.eu>
|
||||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
use {anyhow::anyhow, rocket::serde::Deserialize, serde::de::Error};
|
||||
use {
|
||||
object_store::{ObjectStore, aws},
|
||||
rocket::serde::Deserialize,
|
||||
serde::de::Error,
|
||||
};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(crate = "rocket::serde")]
|
||||
pub struct Settings {
|
||||
#[serde(deserialize_with = "deserialize_s3_bucket")]
|
||||
pub s3_bucket: Box<s3::Bucket>,
|
||||
pub s3_bucket: Box<dyn ObjectStore>,
|
||||
}
|
||||
|
||||
fn deserialize_s3_bucket<'de, D>(deserializer: D) -> Result<Box<s3::Bucket>, D::Error>
|
||||
fn deserialize_s3_bucket<'de, D>(deserializer: D) -> Result<Box<dyn ObjectStore>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
|
@ -31,37 +35,27 @@ pub struct S3Config {
|
|||
pub secret_access_key: String,
|
||||
}
|
||||
|
||||
impl TryInto<Box<s3::Bucket>> for S3Config {
|
||||
impl TryInto<Box<dyn ObjectStore>> for S3Config {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_into(self) -> Result<Box<s3::Bucket>, Self::Error> {
|
||||
let region = s3::Region::Custom {
|
||||
region: self.region,
|
||||
endpoint: self.endpoint,
|
||||
};
|
||||
|
||||
let credentials = s3::creds::Credentials::new(
|
||||
Some(&self.access_key_id),
|
||||
Some(&self.secret_access_key),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
fn try_into(self) -> Result<Box<dyn ObjectStore>, Self::Error> {
|
||||
// TODO: support object stores other than than AWS
|
||||
let object_store = aws::AmazonS3Builder::new()
|
||||
.with_region(self.region)
|
||||
.with_endpoint(&self.endpoint)
|
||||
.with_bucket_name(&self.name)
|
||||
.with_access_key_id(self.access_key_id)
|
||||
.with_secret_access_key(self.secret_access_key)
|
||||
.with_virtual_hosted_style_request(!self.path_style)
|
||||
.with_allow_http(true)
|
||||
.build()?;
|
||||
|
||||
log::info!(
|
||||
"Serving contents from bucket {} at {}",
|
||||
&self.name,
|
||||
region.endpoint()
|
||||
self.endpoint,
|
||||
self.name,
|
||||
);
|
||||
|
||||
let bucket = s3::Bucket::new(&self.name, region, credentials).map_err(|e| anyhow!(e));
|
||||
if self.path_style {
|
||||
bucket.map(|mut b| {
|
||||
b.set_path_style();
|
||||
b
|
||||
})
|
||||
} else {
|
||||
bucket
|
||||
}
|
||||
Ok(Box::new(object_store))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue