fix: use a pipeline of cleaners instead of parallel run

Build a pipeline of transformers, so that paths marked for
keeping or removing will be taken in account also when
building the graph of ELF dependencies.

Also includes a few refactorings.
This commit is contained in:
Matteo Settenvini 2025-02-19 14:30:31 +01:00
parent d033b9d29a
commit 2d48e5bda3
Signed by: matteo
GPG Key ID: 1C1B12600D81DE05
4 changed files with 268 additions and 254 deletions

View File

@ -8,28 +8,21 @@ use crate::{
args::Args,
decision::{Action, Decision},
};
use anyhow::Result;
use anyhow::{Error, Result};
use async_trait::async_trait;
use dso::DsoCleaner;
use list::ListCleaner;
use nix::libc::EXDEV;
use std::{
collections::HashMap,
io,
path::{Path, PathBuf},
};
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
};
use std::{collections::HashMap, io, path::Path};
use tokio::{sync::mpsc, task::JoinSet};
use walkdir::{DirEntry, WalkDir};
#[async_trait]
pub trait Cleaner {
async fn run(
&mut self,
mut files: broadcast::Receiver<PathBuf>,
decisions: mpsc::Sender<Decision>,
mut input: mpsc::Receiver<Decision>,
output: mpsc::Sender<Decision>,
) -> Result<()>;
}
@ -42,23 +35,22 @@ pub struct Runner {
}
const CHANNEL_SIZE: usize = 100;
const CHANNEL_MAX_LOAD: usize = CHANNEL_SIZE * 3 / 4;
impl Runner {
pub fn new(args: Args) -> Self {
let removal_fn = Self::new_removal_fn(&args);
let mut cleaners: Cleaners = vec![];
cleaners.push(Box::new(DsoCleaner::new(args.output_dotfile)));
if let Some(wl) = args.allowlist {
cleaners.push(Box::new(ListCleaner::new(Action::Keep, wl)));
cleaners.push(Box::new(ListCleaner::new(list::ListType::Allow, wl)));
}
if let Some(bl) = args.blocklist {
cleaners.push(Box::new(ListCleaner::new(Action::Remove, bl)));
cleaners.push(Box::new(ListCleaner::new(list::ListType::Block, bl)));
}
cleaners.push(Box::new(DsoCleaner::new(args.output_dotfile)));
Self {
cleaners,
removal_fn,
@ -66,24 +58,19 @@ impl Runner {
}
pub async fn run(self) -> Result<()> {
let input_tx = broadcast::Sender::new(CHANNEL_SIZE);
let (output_tx, output_rx) = mpsc::channel(CHANNEL_SIZE);
let mut tasks = JoinSet::new();
// Processors
for mut cleaner in self.cleaners {
let input_rx = input_tx.subscribe();
let output_tx_clone = output_tx.clone();
tasks.spawn(async move { cleaner.run(input_rx, output_tx_clone).await });
}
drop(output_tx);
let paths_producer = Self::paths_producer(&mut tasks).await;
let output = self
.cleaners
.into_iter()
.fold(paths_producer, |input, mut cleaner| {
let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
tasks.spawn(async move { cleaner.run(input, tx).await });
rx
});
// Producer of inputs (note that this needs to happen
// after all channels have been created)
tasks.spawn(Self::input_producer(input_tx));
// Output consumer
Self::output_consumer(self.removal_fn, output_rx).await;
Self::final_decision(self.removal_fn, output).await;
while let Some(task) = tasks.join_next().await {
if let Err(err) = task? {
@ -93,24 +80,29 @@ impl Runner {
Ok(())
}
async fn input_producer(input_tx: broadcast::Sender<PathBuf>) -> Result<()> {
async fn paths_producer(tasks: &mut JoinSet<Result<(), Error>>) -> mpsc::Receiver<Decision> {
let (input_tx, input_rx) = mpsc::channel(CHANNEL_SIZE);
let walker = WalkDir::new(".").follow_links(false);
tasks.spawn(async move {
for entry in walker {
match entry {
Ok(e) if !Self::is_dir(&e) => {
if input_tx.len() >= CHANNEL_MAX_LOAD {
// TODO: FIXME: make this better, this is a quick hack
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
input_tx.send(e.into_path())?;
input_tx
.send(Decision {
path: e.into_path(),
action: Action::Undecided,
})
.await?;
}
Ok(_) => continue,
Err(err) => log::warn!("unable to access path: {}", err),
}
}
// we handle errors by warning the user, otherwise we always succeed
Ok(())
});
input_rx
}
fn is_dir(entry: &DirEntry) -> bool {
@ -135,22 +127,25 @@ impl Runner {
}
}
async fn output_consumer(removal_fn: RemovalFn, mut output_rx: mpsc::Receiver<Decision>) {
let mut to_remove = HashMap::new();
while let Some(decision) = output_rx.recv().await {
match to_remove.get_mut(&decision.path) {
Some(prev_action) => {
if decision.action == Action::Keep {
*prev_action = Action::Keep;
async fn final_decision(removal_fn: RemovalFn, mut output_rx: mpsc::Receiver<Decision>) {
let mut final_decisions = HashMap::new();
while let Some(input_decision) = output_rx.recv().await {
if input_decision.action == Action::Undecided {
continue;
}
match final_decisions.get_mut(&input_decision.path) {
Some(action) if *action == Action::Keep => { /* nothing to do */ }
Some(action) => {
*action = input_decision.action;
}
None => {
to_remove.insert(decision.path, decision.action);
final_decisions.insert(input_decision.path, input_decision.action);
}
}
}
for (file, action) in to_remove {
for (file, action) in final_decisions {
if action == Action::Remove {
if let Err(err) = (removal_fn)(&file) {
log::error!("{}: {}", file.display(), err);

View File

@ -3,7 +3,7 @@
use super::Cleaner;
use crate::decision::{Action, Decision};
use anyhow::Result;
use anyhow::{Context, Result};
use async_trait::async_trait;
use goblin::elf::Elf;
use memmap2::Mmap;
@ -19,10 +19,7 @@ use std::{
io::{ErrorKind, Read, Seek, Write},
path::{Path, PathBuf},
};
use tokio::sync::{
broadcast::{self, error::RecvError},
mpsc,
};
use tokio::sync::mpsc;
type InodeMap = HashMap<ino_t, HashSet<PathBuf>>;
type InodeGraph = DiGraphMap<ino_t, ()>;
@ -44,17 +41,6 @@ impl DsoCleaner {
}
}
impl Default for State {
fn default() -> Self {
let mut paths_map = InodeMap::default();
let mut graph = InodeGraph::default();
paths_map.insert(ROOT_NODE, HashSet::from([PathBuf::from("«root»")]));
graph.add_node(ROOT_NODE);
Self { paths_map, graph }
}
}
const ROOT_NODE: ino_t = 0;
const ELF_MAGIC_HEADER: &[u8; 4] = b"\x7fELF";
@ -62,76 +48,101 @@ const ELF_MAGIC_HEADER: &[u8; 4] = b"\x7fELF";
impl Cleaner for DsoCleaner {
async fn run(
&mut self,
mut files: broadcast::Receiver<PathBuf>,
decisions: mpsc::Sender<Decision>,
mut input: mpsc::Receiver<Decision>,
output: mpsc::Sender<Decision>,
) -> Result<()> {
let mut state = State::default();
loop {
match files.recv().await {
Ok(file) => {
if let Err(e) = Self::process_file(&mut state, &file) {
let mut inodes_to_keep = HashSet::new();
inodes_to_keep.insert(ROOT_NODE);
while let Some(decision) = input.recv().await {
// If we know something MUST be removed (e.g. a binary or
// a symlink) it makes sense now to avoid adding it to the graph so
// that also its dependencies will not be kept.
if decision.action != Action::Remove {
state.process_path(&decision.path).unwrap_or_else(|e| {
log::warn!(
"{}: {} (this might produce wrong results!)",
file.display(),
decision.path.display(),
e
);
});
}
// If something is "keep", add it to the list
// of nodes on the DFS stack for the graph
// we are building, so that it will be kept along
// with any dependencies.
if decision.action == Action::Keep {
let ino = nix::sys::stat::lstat(&decision.path)?.st_ino;
inodes_to_keep.insert(ino);
dbg!(&decision.path, ino);
}
Err(RecvError::Closed) => break,
e => {
e?;
}
// If something was marked as "keep" or "remove" before,
// we can immediately send it back as such, so that previous
// determinations are preserved.
//
// TODO: if the user forces the removal of a needed dependency
// of an ELF file, it is not for us to stop them, but we should
// probably warn them this is unwise?
if decision.action != Action::Undecided {
output.send(decision).await?;
}
}
if let Some(dot) = &self.output_dot {
debug_print_graph(&state, &dot)?;
state.debug_print_graph(&dot)?;
}
let mut dfs = Dfs::new(&state.graph, ROOT_NODE);
let mut dfs = Dfs::empty(&state.graph);
dfs.stack = inodes_to_keep.into_iter().collect();
while let Some(_) = dfs.next(&state.graph) {}
for path in state
.paths_map
.into_iter()
.filter_map(|(n, paths)| {
if !dfs.discovered.contains(&n) {
Some(paths)
for (inode, paths) in state.paths_map.into_iter() {
let action = if !dfs.discovered.contains(&inode) {
// The file represented by this inode was unreachable after
// conducting our DFS, hence all its associated paths
// can be removed.
Action::Remove
} else {
None
Action::Undecided
};
for path in paths {
if path.display().to_string().contains("libjsonrpccpp-server") {
dbg!(inode, &path, action);
}
output.send(Decision { path, action }).await?;
}
})
.flatten()
{
decisions
.send(Decision {
path,
action: Action::Remove,
})
.await?;
}
Ok(())
}
}
impl DsoCleaner {
fn process_file(state: &mut State, path: &Path) -> Result<()> {
log::trace!("processing {}", path.display());
let mut f = File::open(path)?;
let mut hdr = [0u8; 4];
if let Err(e) = f.read_exact(&mut hdr) {
if e.kind() != ErrorKind::UnexpectedEof {
anyhow::bail!("{}: {}", path.display(), e)
impl Default for State {
fn default() -> Self {
let mut paths_map = InodeMap::default();
let mut graph = InodeGraph::default();
let fake_root_node = PathBuf::from("«root»");
paths_map.insert(ROOT_NODE, HashSet::from([fake_root_node]));
graph.add_node(ROOT_NODE);
Self { paths_map, graph }
}
}
return Ok(()); // not ELF, ignore
};
impl State {
fn process_path(&mut self, path: &Path) -> Result<()> {
log::trace!("processing {}", path.display());
let mut f = File::open(path)?;
let is_elf = &hdr == ELF_MAGIC_HEADER;
if !is_elf {
return Ok(());
if !is_elf(&mut f).context(path.display().to_string())? {
return Ok(()); // Ignore non-ELF files
}
f.rewind()?;
@ -139,17 +150,17 @@ impl DsoCleaner {
let elf = Elf::parse(&mmap)?;
if path.is_symlink() {
Self::process_elf_symlink(state, path, &elf)
self.process_elf_symlink(path, &elf)
} else {
Self::process_elf_file(state, path, &elf)
self.process_elf_file(path, &elf)
}
}
fn process_elf_symlink(state: &mut State, path: &Path, elf: &Elf) -> Result<()> {
fn process_elf_symlink(&mut self, path: &Path, elf: &Elf) -> Result<()> {
let src = nix::sys::stat::lstat(path)?;
if !elf.is_lib {
// To be able to use DFS on the graph later, we link each executable symlink to a fake ROOT_NODE
Self::update_graph(state, "".into(), ROOT_NODE, path.to_owned(), src.st_ino);
self.update_graph("".into(), ROOT_NODE, path.to_owned(), src.st_ino);
}
let current_dir = std::env::current_dir()?;
@ -175,22 +186,22 @@ impl DsoCleaner {
path.display(),
dst_path.display()
);
Self::update_graph(state, path.into(), src.st_ino, dst_path, dst.st_ino);
self.update_graph(path.into(), src.st_ino, dst_path, dst.st_ino);
}
Ok(())
}
fn process_elf_file(state: &mut State, path: &Path, elf: &Elf) -> Result<()> {
fn process_elf_file(&mut self, path: &Path, elf: &Elf) -> Result<()> {
log::trace!("dso: adding to graph elf file '{}'", path.display());
let src = nix::sys::stat::stat(path)?;
if !elf.is_lib {
// To be able to use DFS on the graph later, we link each executable to a fake ROOT_NODE
Self::update_graph(state, "".into(), ROOT_NODE, path.to_owned(), src.st_ino);
self.update_graph("".into(), ROOT_NODE, path.to_owned(), src.st_ino);
}
let search_paths = Self::determine_lib_search_paths(path, elf)?;
let search_paths = determine_lib_search_paths(path, elf)?;
'next_lib: for &library in elf.libraries.iter() {
for lib_path in search_paths.iter() {
@ -209,7 +220,7 @@ impl DsoCleaner {
continue; // These are not the droids you are looking for.
}
Self::update_graph(state, path.into(), src.st_ino, tentative_path, dst.st_ino);
self.update_graph(path.into(), src.st_ino, tentative_path, dst.st_ino);
continue 'next_lib;
}
@ -219,6 +230,67 @@ impl DsoCleaner {
Ok(())
}
fn update_graph(
&mut self,
src_path: PathBuf,
src_inode: ino_t,
dst_path: PathBuf,
dst_inode: ino_t,
) {
self.paths_map
.entry(src_inode)
.or_default()
.insert(src_path);
self.paths_map
.entry(dst_inode)
.or_default()
.insert(dst_path);
self.graph.add_edge(src_inode, dst_inode, ());
}
fn debug_print_graph(&self, output_file: &Path) -> Result<()> {
use std::ffi::OsStr;
let mut output_dot = File::create(output_file)?;
write!(
&mut output_dot,
indoc::indoc! {
"digraph {{
rankdir=\"LR\"
{:?}
}}"
},
petgraph::dot::Dot::with_attr_getters(
&self.graph,
&[
dot::Config::NodeNoLabel,
dot::Config::EdgeNoLabel,
dot::Config::GraphContentOnly
],
&|_, _| { String::new() },
&|_, n| {
let paths = self.paths_map.get(&n.id()).unwrap();
let first_path = paths.iter().next().expect(&format!(
"dso: you have a path map with an empty entry for inode {}",
n.id()
));
format!(
"label = \"({}, {})\"",
n.weight(),
&first_path
.file_name()
.unwrap_or(OsStr::new("🚀"))
.to_string_lossy()
)
}
)
)?;
Ok(())
}
}
fn determine_lib_search_paths(path: &Path, elf: &Elf<'_>) -> Result<Vec<String>> {
let mut search_paths = vec![];
@ -242,7 +314,7 @@ impl DsoCleaner {
search_paths.append(&mut rpaths);
}
search_paths.append(&mut Self::get_env_library_paths());
search_paths.append(&mut get_env_library_paths());
}
if elf.runpaths != vec![""] {
@ -273,65 +345,15 @@ impl DsoCleaner {
.unwrap_or_default()
}
fn update_graph(
state: &mut State,
src_path: PathBuf,
src_inode: ino_t,
dst_path: PathBuf,
dst_inode: ino_t,
) {
state
.paths_map
.entry(src_inode)
.or_default()
.insert(src_path);
state
.paths_map
.entry(dst_inode)
.or_default()
.insert(dst_path);
state.graph.add_edge(src_inode, dst_inode, ());
}
fn is_elf(f: &mut File) -> Result<bool> {
let mut hdr = [0u8; 4];
if let Err(e) = f.read_exact(&mut hdr) {
if e.kind() != ErrorKind::UnexpectedEof {
anyhow::bail!(e)
}
fn debug_print_graph(state: &State, output_file: &Path) -> Result<()> {
use std::ffi::OsStr;
return Ok(false);
};
let mut output_dot = File::create(output_file)?;
write!(
&mut output_dot,
indoc::indoc! {
"digraph {{
rankdir=\"LR\"
{:?}
}}"
},
petgraph::dot::Dot::with_attr_getters(
&state.graph,
&[
dot::Config::NodeNoLabel,
dot::Config::EdgeNoLabel,
dot::Config::GraphContentOnly
],
&|_, _| { String::new() },
&|_, n| {
let paths = state.paths_map.get(&n.id()).unwrap();
let first_path = paths.iter().next().expect(&format!(
"dso: you have a path map with an empty entry for inode {}",
n.id()
));
format!(
"label = \"({}, {})\"",
n.weight(),
&first_path
.file_name()
.unwrap_or(OsStr::new("🚀"))
.to_string_lossy()
)
}
)
)?;
Ok(())
Ok(&hdr == ELF_MAGIC_HEADER)
}

View File

@ -6,19 +6,21 @@ use crate::decision::{Action, Decision};
use anyhow::{Context, Result};
use async_trait::async_trait;
use std::path::PathBuf;
use tokio::sync::{
broadcast::{self, error::RecvError},
mpsc,
};
use tokio::sync::mpsc;
pub struct ListCleaner {
action_type: Action,
list_type: ListType,
list: PathBuf,
}
pub enum ListType {
Allow,
Block,
}
impl ListCleaner {
pub fn new(action_type: Action, list: PathBuf) -> Self {
Self { action_type, list }
pub fn new(list_type: ListType, list: PathBuf) -> Self {
Self { list_type, list }
}
}
@ -26,8 +28,8 @@ impl ListCleaner {
impl Cleaner for ListCleaner {
async fn run(
&mut self,
mut files: broadcast::Receiver<PathBuf>,
decisions: mpsc::Sender<Decision>,
mut input: mpsc::Receiver<Decision>,
output: mpsc::Sender<Decision>,
) -> Result<()> {
let current_dir = std::env::current_dir()?;
@ -43,27 +45,20 @@ impl Cleaner for ListCleaner {
.build()
.with_context(|| format!("list: cannot build matcher from {}", self.list.display()))?;
let action_name = match self.action_type {
Action::Keep => "allow",
Action::Remove => "block",
let (action_name, action) = match self.list_type {
ListType::Allow => ("allow", Action::Keep),
ListType::Block => ("block", Action::Remove),
};
loop {
match files.recv().await {
Ok(path) => {
if matcher.matched(&path, false).is_ignore() {
log::info!("{}list: marking {}", action_name, path.display());
decisions
.send(Decision {
action: self.action_type,
path: path,
})
.await?;
}
}
Err(RecvError::Closed) => break,
Err(e) => Err(anyhow::anyhow!(e)).context("allowlist: recv error")?,
while let Some(mut decision) = input.recv().await {
if decision.action == Action::Undecided
&& matcher.matched(&decision.path, false).is_ignore()
{
log::info!("{}list: marking {}", action_name, decision.path.display());
decision.action = action;
}
output.send(decision).await?;
}
Ok(())

View File

@ -3,12 +3,14 @@
use std::path::PathBuf;
#[derive(PartialEq, Eq, Clone, Copy)]
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum Action {
Undecided,
Keep,
Remove,
}
#[derive(Debug, PartialEq, Eq)]
pub struct Decision {
pub path: PathBuf,
pub action: Action,