Cookbook
Task-oriented recipes: find the thing you're trying to do, copy the snippet,
follow the link when you need the fine print. Every snippet assumes a tokio
runtime and use processkit::Command; unless shown otherwise.
- Run a command and get its output
- Inspect a failure instead of erroring
- Ask a yes/no question
- Accept non-zero exit codes as success
- Bound a run with a timeout
- Let a tool clean up on timeout
- Show a useful error message
- Feed the child's stdin
- Stream output as it arrives
- Talk to an interactive child
- Pipe commands without a shell
- Start a server and wait until it's ready
- Tear down several children as a unit
- React to whichever child exits first
- Sandbox an untrusted tool
- Keep a crash-prone service running
- Retry a flaky command
- Cancel runs on shutdown
- Measure what a run cost
- Contain a process you didn't spawn
- Test code that runs processes — without processes
- Test streaming code — without processes
- Wrap a CLI tool behind a typed API
Run a command and get its output
#![allow(unused)] fn main() { let head = Command::new("git").args(["rev-parse", "HEAD"]).run().await?; }
run() requires a zero exit and returns stdout with trailing whitespace
trimmed; a non-zero exit, spawn failure, or timeout is a typed Error. For a
one-liner without the builder: processkit::run("git", ["rev-parse", "HEAD"]).
Fine print: Running commands → consuming verbs.
Inspect a failure instead of erroring
#![allow(unused)] fn main() { let result = Command::new("git").args(["merge", "topic"]).output_string().await?; if !result.is_success() { eprintln!("merge exited {:?}: {}", result.code(), result.stderr()); } }
output_string() (and output_bytes() for raw bytes) treats the exit code as
data — Err means the run couldn't happen at all. Call
result.ensure_success()? later to convert a stored failure into the same
typed error run() would have produced.
Fine print: Running commands → results and errors.
Ask a yes/no question
#![allow(unused)] fn main() { let dirty = !Command::new("git").args(["diff", "--quiet"]).probe().await?; }
probe() maps exit 0 → true, exit 1 → false, and anything else to an
error — the git diff --quiet / grep -q convention without manual code
matching.
Accept non-zero exit codes as success
#![allow(unused)] fn main() { // `grep` exits 1 when it finds no match — not a failure for this call. let found = Command::new("grep") .args(["needle", "haystack.txt"]) .ok_codes([0, 1]) .output_string() .await?; let matched = found.code() == Some(0); // 0 = matched, 1 = no match (both "success") }
ok_codes widens what the checking verbs (run/run_unit) and
is_success/ensure_success treat as success — for tools whose non-zero exit is a
normal result (grep 1 = no match, diff 1 = differs, rsync's code families). It
does not change exit_code (always the raw code) or probe (always the 0/1
convention). An empty set is ignored, so the default stays exit 0.
Bound a run with a timeout
#![allow(unused)] fn main() { use std::time::Duration; let result = Command::new("slow-tool") .timeout(Duration::from_secs(30)) .output_string() .await?; if result.timed_out() { eprintln!("gave up after 30s; partial output: {}", result.stdout()); } }
At the deadline the whole tree is killed. On the capture verbs the timeout is
captured (timed_out(), partial output kept); on the success-checking verbs
(run, exit_code) it surfaces as Error::Timeout.
Let a tool clean up on timeout
#![allow(unused)] fn main() { use std::time::Duration; let result = Command::new("dev-server") .timeout(Duration::from_secs(30)) .timeout_grace(Duration::from_secs(5)) // SIGTERM, wait up to 5s, then SIGKILL .output_string() .await?; }
timeout_grace turns the hard deadline kill into a graceful one: SIGTERM (or the
signal from timeout_signal, with the process-control feature), up to the grace
window to exit, then SIGKILL. A signal-handling child exits early; timed_out()
stays true. Windows has no signal tier — the deadline kills atomically.
Fine print: Timeouts → graceful timeout.
Show a useful error message
#![allow(unused)] fn main() { if let Err(e) = Command::new("git").args(["merge", "topic"]).run().await { eprintln!("merge failed: {}", e.diagnostic().unwrap_or("(no output)")); } }
Error::diagnostic() picks the most explanatory captured text — stderr,
falling back to stdout (git writes CONFLICT … there) — so callers don't
re-implement the same heuristic.
Feed the child's stdin
#![allow(unused)] fn main() { use processkit::Stdin; // A string you already have: let sorted = Command::new("sort") .stdin(Stdin::from_string("banana\napple\n")) .run() .await?; // …or any async source: a reader (file, socket) or a stream of lines. let from_file = Stdin::from_reader(tokio::fs::File::open("input.txt").await?); let from_chan = Stdin::from_lines(tokio_stream::iter(vec!["one".to_owned()])); }
One-shot sources (from_reader/from_lines) feed a single run; re-running the
same Command afterwards fails loud (an Error::Io at launch, D10) instead
of silently seeing empty stdin. For a conversation, see the next recipe but one.
Fine print: Running commands → standard input.
Stream output as it arrives
#![allow(unused)] fn main() { use processkit::{StreamExt, Finished}; // StreamExt re-exported; provides `.next()` let mut run = Command::new("cargo").args(["build", "--verbose"]).start().await?; let mut lines = run.stdout_lines()?; while let Some(line) = lines.next().await { println!("build: {line}"); } let Finished { outcome, stderr, .. } = run.finish().await?; // outcome + buffered stderr }
No waiting for exit, no full-output buffering; stderr is drained in the
background so the child can't block. A timeout on the command bounds the
stream itself. Prefer a callback? .on_stdout_line(|l| …) runs one per line
while any capture verb drives the run.
Fine print: Streaming & interactive I/O.
Talk to an interactive child
#![allow(unused)] fn main() { use processkit::StreamExt; let mut run = Command::new("bc").keep_stdin_open().start().await?; let mut stdin = run.take_stdin().expect("stdin was kept open"); stdin.write_line("2 + 2").await?; stdin.finish().await?; // EOF — bc exits let mut answers = run.stdout_lines()?; while let Some(answer) = answers.next().await { println!("{answer}"); } }
keep_stdin_open() hands you an async writer instead of closing stdin at
spawn; interleave writes with reads for request/response tools. Its writer
methods return std::io::Result (idiomatic for a writer) — convert with
.map_err(processkit::Error::Io)? in a processkit::Result function, or use
Box<dyn std::error::Error>.
Fine print: Streaming & interactive I/O → interactive stdin.
Pipe commands without a shell
#![allow(unused)] fn main() { let authors = Command::new("git").args(["log", "--format=%an"]) .pipe(Command::new("sort")) .pipe(Command::new("uniq").arg("-c")) .output_string() .await?; }
Native pipes — no shell string, no quoting, no injection surface. The outcome
is pipefail: stdout comes from the last stage, the reported failure from
the first stage that didn't exit cleanly. All stages share one kill-on-drop
group. The | operator is equivalent sugar:
(a | b | c).output_string().
For a consumer that legitimately stops reading early — the | head -1 shape,
where the producer's broken-pipe death (its next write fails once the downstream
closes, or SIGPIPE where the OS delivers it) is expected — mark the producer
unchecked_in_pipe() so that death doesn't fail the chain:
#![allow(unused)] fn main() { let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe() | Command::new("head").args(["-n", "1"])) .run() .await?; }
Fine print: Pipelines → unchecked stages.
Start a server and wait until it's ready
#![allow(unused)] fn main() { use std::time::Duration; let mut server = Command::new("my-server").args(["--port", "8080"]).start().await?; // Pick the probe that matches how the server announces readiness: server.wait_for_line(|l| l.contains("listening"), Duration::from_secs(10)).await?; // server.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10)).await?; // server.wait_for(|| async { http_health().await }, Duration::from_secs(10)).await?; // …use the server; dropping `server` kills its whole tree. }
A probe that can't succeed fails fast with Error::NotReady and never kills
the child — you decide what happens next. No more sleep(2) and hoping.
Fine print: Streaming & interactive I/O → readiness probes.
Tear down several children as a unit
#![allow(unused)] fn main() { use processkit::ProcessGroup; let group = ProcessGroup::new()?; let _db = group.start(&Command::new("dev-db")).await?; let _api = group.start(&Command::new("dev-api")).await?; // Either: graceful — SIGTERM, bounded wait, optional SIGKILL escalation… group.shutdown().await?; // …or just drop(group): hard kill-on-drop of everything, grandchildren included. }
The group is the unit of fate: a panic or early return anywhere reaps every
member. Configure the grace window via ProcessGroupOptions.
Fine print: Process groups.
React to whichever child exits first
#![allow(unused)] fn main() { use processkit::{ProcessGroup, wait_any}; let group = ProcessGroup::new()?; let mut a = group.start(&Command::new("worker-a")).await?; let mut b = group.start(&Command::new("worker-b")).await?; let (idx, outcome) = wait_any(&mut [&mut a, &mut b]).await?; println!("worker #{idx} exited first with {outcome:?}"); // `a` and `b` are only borrowed — the loser is still usable here. }
Fine print: Streaming & interactive I/O → racing children.
Sandbox an untrusted tool
#![allow(unused)] fn main() { use processkit::{ProcessGroup, ProcessGroupOptions}; // Cap the whole tree (requires the `limits` feature; Windows Job / Linux cgroup): let group = ProcessGroup::with_options( ProcessGroupOptions::default() .memory_max(512 * 1024 * 1024) .max_processes(64) .cpu_quota(0.5), )?; let result = group .start( &Command::new("untrusted-tool") .inherit_env(["PATH"]) // allow-list: everything else is cleared .timeout(std::time::Duration::from_secs(60)), ) .await? .output_string() .await?; }
Unenforceable limits are a hard Error::ResourceLimit, never a silently
unbounded group. On Unix, add .uid(…)/.gid(…) to drop privileges (note the
cgroup-mechanism caveat in the guide).
Fine print: Process groups → resource limits · Running commands → privileges.
Keep a crash-prone service running
#![allow(unused)] fn main() { use processkit::{RestartPolicy, Supervisor}; use std::time::Duration; let outcome = Supervisor::new(Command::new("my-service")) .restart(RestartPolicy::OnCrash) .max_restarts(5) .backoff(Duration::from_millis(200), 2.0) .storm_pause(Duration::from_secs(15)) // crash-loop guard (off by default) .run() .await?; println!( "stopped after {} restarts ({} storm pauses): {:?}", outcome.restarts, outcome.storm_pauses, outcome.stopped ); }
Exponential backoff with jitter by default; stop_when(…) ends supervision on
a condition; .with_runner(&group) keeps every incarnation inside one shared
kill-on-drop group. storm_pause arms the failure-storm guard: failures feed
a decaying score, and past the threshold the supervisor takes one collective
pause instead of hammering restarts — "fails rarely" and "crash-looping" stop
being the same case.
Fine print: Supervision, failure storms.
Retry a flaky command
#![allow(unused)] fn main() { use processkit::Error; use std::time::Duration; let fetched = Command::new("git") .args(["fetch", "--quiet"]) .timeout(Duration::from_secs(10)) .retry(3, Duration::from_millis(200), |e| { matches!(e, Error::Timeout { .. }) || e.diagnostic().is_some_and(|m| m.contains("Could not resolve host")) }) .run() .await?; }
The classifier sees the typed error and decides whether this failure is worth
another attempt; each attempt is a fresh process. retry replays a run to
success — for keeping a process alive, use a Supervisor (previous recipe).
Fine print: Timeouts, retries & cancellation → retry.
Cancel runs on shutdown
#![allow(unused)] fn main() { use processkit::CancellationToken; let token = CancellationToken::new(); let job = tokio::spawn({ let token = token.child_token(); async move { Command::new("long-job").cancel_on(token).run().await } }); // On Ctrl-C / shutdown signal / sibling failure: token.cancel(); // kills the tree; the run resolves to Error::Cancelled let outcome = job.await; // Err(Error::Cancelled { .. }) inside }
Cancellation is always an error (the run was abandoned, there is no result),
beats a simultaneous timeout, and is terminal for retry and Supervisor
alike.
For a typed wrapper whose commands never cross your code, set the token once on the client — every command it builds carries it:
#![allow(unused)] fn main() { use processkit::{CancellationToken, CliClient}; let token = CancellationToken::new(); let gh = CliClient::new("gh").default_cancel_on(token.child_token()); // token.cancel() → every in-flight command of THIS client dies. }
Fine print: Timeouts, retries & cancellation → cancellation, client-level default.
Measure what a run cost
#![allow(unused)] fn main() { use std::time::Duration; // One run, summarized (requires the opt-in `stats` feature): let profile = Command::new("crunch").start().await?.profile(Duration::from_millis(100)).await?; println!("exit={:?} took={:?} peak_rss={:?} avg_cpu={:?}", profile.exit_code, profile.duration, profile.peak_memory_bytes, profile.avg_cpu()); }
For a live series over a whole group, group.sample_stats(every) yields a
Stream of snapshots. CPU/memory need a real container (Windows Job / Linux
cgroup); elsewhere you still get process counts.
Fine print: Process groups → stats · Streaming → profiling.
Contain a process you didn't spawn
#![allow(unused)] fn main() { use processkit::ProcessGroup; let child = tokio::process::Command::new("legacy-launcher").spawn()?; let group = ProcessGroup::new()?; // `adopt` is part of `process-control` (default-on) group.adopt(&child)?; // from now on the group's teardown covers it }
Adoption is best-effort by mechanism — on Windows/cgroup the whole running tree joins; on the POSIX process-group backends an exec'd child is contained individually (its future forks too, where it could be re-grouped). The guide spells out exactly what each mechanism can promise.
Fine print: Process groups → adopt · Platform support.
Test code that runs processes — without processes
#![allow(unused)] fn main() { use processkit::testing::{Reply, ScriptedRunner}; // Your code takes any `R: ProcessRunner`; in tests, hand it a script. // Rules match on a prefix of the *program name followed by its arguments* // (the first element is the program): let runner = ScriptedRunner::new() .on(["git", "rev-parse"], Reply::ok("abc123\n")) .on(["git", "push"], Reply::fail(128, "remote: permission denied")) .fallback(Reply::ok("")); // my_deploy(&runner).await? — no subprocess, fully deterministic. }
RecordingRunner wraps any runner and captures every Invocation for
assertions; MockRunner (feature mock) gives mockall expectations; and
the record feature's RecordReplayRunner records real runs into a JSON
cassette once and replays them hermetically in CI.
Fine print: Testing your code.
Test streaming code — without processes
#![allow(unused)] fn main() { use processkit::{Command, Outcome, Finished}; use processkit::testing::{Reply, ScriptedRunner}; use std::time::Duration; let runner = ScriptedRunner::new() .on(["gh", "run", "watch"], Reply::lines(["queued", "in_progress", "completed"]) .with_line_delay(Duration::from_millis(50))); // paced delivery let mut run = runner.start(&Command::new("gh").args(["run", "watch", "123"])).await?; run.wait_for_line(|l| l.contains("completed"), Duration::from_secs(5)).await?; let Finished { outcome, .. } = run.finish().await?; assert_eq!(outcome, Outcome::Exited(0)); }
A scripted start() feeds the canned lines through the same pump
machinery a real child uses, so stdout_lines, the readiness probes, and
finish behave identically — and with_line_delay is deterministic
under #[tokio::test(start_paused = true)]. Canned output also replays
through on_stdout_line/on_stderr_line handlers on the bulk verbs, so
progress-reporting paths test hermetically too.
Fine print: Testing → scripted streaming.
Wrap a CLI tool behind a typed API
#![allow(unused)] fn main() { use processkit::{cli_client, ProcessRunner, Result}; cli_client!(pub struct Git => "git"); impl<R: ProcessRunner> Git<R> { pub async fn current_branch(&self) -> Result<String> { // A verb takes the args directly; pass a built `command(..)` only // when you need to customize it (per-call timeout, stdin, …). self.core.run(["branch", "--show-current"]).await } pub async fn is_clean(&self) -> Result<bool> { self.core.probe(["diff", "--quiet"]).await } } }
The generated struct carries a runner and per-client defaults
(default_timeout, default_env); your methods are just argument lists and
parsers — and because the runner is injectable, the whole wrapper is testable
with the previous recipe's ScriptedRunner.
Fine print: Testing your code → CliClient.