feat(cleaners): add main async runner

Adds a Runner able to work upon a set of cleaners that
work in parallel.
This commit is contained in:
Matteo Settenvini 2025-01-25 02:19:48 +01:00
parent cc1a66726e
commit 5507a1dd21
Signed by: matteo
GPG key ID: 1C1B12600D81DE05
8 changed files with 510 additions and 6 deletions

View file

@ -1,8 +1,23 @@
// SPDX-FileCopyrightText: Matteo Settenvini <matteo.settenvini@montecristosoftware.eu>
// SPDX-License-Identifier: EUPL-1.2
use std::path::PathBuf;
use clap::Parser;
/// A tool to clean up sysroots for Linux embedded devices to save storage space.
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct Args {}
pub struct Args {
/// Simulate operations without carrying them out
#[arg(short = 'n', long, default_value_t = false)]
pub dry_run: bool,
/// Instead of simply removing files, move them to the
/// given location, preserving their relative folder structure
#[arg(long)]
pub split_to: Option<PathBuf>,
/// The location of the sysroot to clean up
pub sysroot_location: PathBuf,
}

173
src/cleaners.rs Normal file
View file

@ -0,0 +1,173 @@
// SPDX-FileCopyrightText: Matteo Settenvini <matteo.settenvini@montecristosoftware.eu>
// SPDX-License-Identifier: EUPL-1.2
mod dso;
use crate::{
args::Args,
decision::{Action, Decision},
};
use anyhow::Result;
use async_trait::async_trait;
use dso::Dso;
use nix::libc::EXDEV;
use std::{
collections::HashMap,
io,
path::{Path, PathBuf},
};
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
};
use walkdir::WalkDir;
#[async_trait]
pub trait Cleaner {
async fn run(
&mut self,
files: broadcast::Receiver<PathBuf>,
decisions: mpsc::Sender<Decision>,
) -> Result<()>;
}
type RemovalFn = Box<dyn Fn(PathBuf) -> io::Result<()>>;
pub struct Runner {
cleaners: Vec<Box<dyn Cleaner + Send>>,
removal_fn: RemovalFn,
}
impl Runner {
pub fn new(args: Args) -> Self {
let removal_fn = Self::new_removal_fn(&args);
Self {
cleaners: vec![Box::new(Dso::new())],
removal_fn,
}
}
pub async fn run(self) -> Result<()> {
let input_tx = broadcast::Sender::new(100);
let (output_tx, output_rx) = mpsc::channel(100);
let mut tasks = JoinSet::new();
// Processors
for mut cleaner in self.cleaners {
let input_rx = input_tx.subscribe();
let output_tx_clone = output_tx.clone();
tasks.spawn(async move { cleaner.run(input_rx, output_tx_clone).await });
}
drop(output_tx);
// Producer of inputs (note that this needs to happen
// after all channels have been created)
tasks.spawn(Self::input_producer(input_tx));
// Output consumer
Self::output_consumer(self.removal_fn, output_rx).await;
while let Some(task) = tasks.join_next().await {
if let Err(err) = task? {
log::error!("{}", err);
}
}
Ok(())
}
async fn input_producer(input_tx: broadcast::Sender<PathBuf>) -> Result<()> {
let walker = WalkDir::new(".");
for entry in walker {
match entry {
Ok(e) if !e.file_type().is_dir() => {
if input_tx.len() >= 75 {
// TODO: FIXME: make this better, e.g. use backoff, this is a quick
// hack
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
input_tx.send(e.into_path())?;
}
Ok(_) => continue,
Err(err) => log::warn!("unable to access path: {}", err),
}
}
// we handle errors by warning the user, otherwise we always succeed
Ok(())
}
async fn output_consumer(removal_fn: RemovalFn, mut output_rx: mpsc::Receiver<Decision>) {
let mut to_remove = HashMap::new();
while let Some(decision) = output_rx.recv().await {
match to_remove.get_mut(&decision.path) {
Some(prev_action) => {
if decision.action == Action::Keep {
*prev_action = Action::Keep;
}
}
None => {
to_remove.insert(decision.path, decision.action);
}
}
}
for (file, action) in to_remove {
if action == Action::Remove {
if let Err(err) = (removal_fn)(file) {
log::error!("{}", err);
}
}
}
}
fn new_removal_fn(args: &Args) -> RemovalFn {
if let Some(dest) = args.split_to.clone() {
if args.dry_run {
Box::new(move |path| {
log::info!(
"(dry-run) would move {} to {}",
path.display(),
dest.display()
);
Ok(())
})
} else {
Box::new(move |path| {
log::info!("moving {} to {}", path.display(), dest.display());
Self::move_preserve(&path, &dest)
})
}
} else {
if args.dry_run {
Box::new(|path| {
log::info!("(dry-run) would remove {}", path.display());
Ok(())
})
} else {
Box::new(move |path| {
log::info!("removing {}", path.display());
std::fs::remove_file(&path)
})
}
}
}
fn move_preserve(src: &Path, dest: &Path) -> io::Result<()> {
assert!(src.is_relative());
let abs_dest = dest.join(src);
if let Some(parent) = abs_dest.parent() {
std::fs::create_dir_all(parent)?;
}
match std::fs::rename(&src, &abs_dest) {
Err(err) if err.raw_os_error() == Some(EXDEV) => {
log::trace!(
"different filesystems, falling back to copying {} to {}",
src.display(),
abs_dest.display()
);
std::fs::copy(src, abs_dest).and_then(|_| std::fs::remove_file(src))
}
other => other,
}
}
}

40
src/cleaners/dso.rs Normal file
View file

@ -0,0 +1,40 @@
// SPDX-FileCopyrightText: Matteo Settenvini <matteo.settenvini@montecristosoftware.eu>
// SPDX-License-Identifier: EUPL-1.2
use super::Cleaner;
use crate::decision::{Action, Decision};
use anyhow::Result;
use async_trait::async_trait;
use std::path::PathBuf;
use tokio::sync::{broadcast, mpsc};
/// Cleans up unused shared libraries
/// and warns about broken dependencies as well
pub struct Dso {}
impl Dso {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl Cleaner for Dso {
async fn run(
&mut self,
mut files: broadcast::Receiver<PathBuf>,
decisions: mpsc::Sender<Decision>,
) -> Result<()> {
while let Ok(file) = files.recv().await {
// TODO: handle Lagged?
decisions
.send(Decision {
path: file,
action: Action::Remove,
})
.await?;
}
Ok(())
}
}

16
src/decision.rs Normal file
View file

@ -0,0 +1,16 @@
// SPDX-FileCopyrightText: Matteo Settenvini <matteo.settenvini@montecristosoftware.eu>
// SPDX-License-Identifier: EUPL-1.2
use std::path::PathBuf;
#[derive(PartialEq, Eq)]
pub enum Action {
Keep,
Remove,
}
pub struct Decision {
pub path: PathBuf,
pub action: Action,
//pub reason: Option<String>,
}

View file

@ -2,16 +2,24 @@
// SPDX-License-Identifier: EUPL-1.2
mod args;
mod cleaners;
mod decision;
use anyhow::Result;
use args::Args;
use clap::Parser as _;
use cleaners::Runner;
use env_logger::Env;
fn main() -> Result<()> {
let logging_env = Env::default().default_filter_or("info");
#[tokio::main]
async fn main() -> Result<()> {
let logging_env = Env::default().filter_or("LOG_LEVEL", "warn");
env_logger::Builder::from_env(logging_env).init();
let _args = Args::try_parse()?;
let args = Args::try_parse()?;
std::env::set_current_dir(&args.sysroot_location)?;
let runner = Runner::new(args);
runner.run().await?;
Ok(())
}