
ProcessKit is an async child-process management library for Rust (tokio). It solves the orphan-process problem at the kernel level and packages a full set of tools around it: streaming I/O, shell-free pipelines, supervision, and hermetic testing seams.
Running external programs is part of everyday software: compiling code, querying version control, invoking CLI tools, managing background servers. Every major runtime makes it easy to start a child process. What they don't make easy is cleaning up after one.
The orphan problem
When a build tool spawns compiler workers, when an integration test starts a local database, when a wrapper script calls the real binary — those grandchildren exist outside your program's awareness. If your code panics, times out, or drops a future mid-flight, the direct child may receive a signal. But everything deeper in the tree keeps running as orphans: ports stay bound, temp files stay open, CPU keeps spinning. The next test run tries to bind the same port and fails with "address already in use."
This is not an edge case. It is the default behavior of every process-spawning
API that works at the level of a single pid — including std::process and
tokio::process.
Whole-tree containment
ProcessKit solves this at the kernel level. Every process you start lives inside an operating-system containment object: a Job Object on Windows, a cgroup v2 on Linux (with a POSIX process-group fallback on hosts without cgroup delegation), or a POSIX process group on macOS and BSDs.
When the Rust value that owns the container is dropped — by normal flow, by an
error propagating through ?, or by a panic — the kernel kills every member of
the tree. Grandchildren included. This is not a best-effort signal sent to a list
of pids. It is one atomic kernel operation. A child cannot escape the container
by forking quickly; a signal cannot be missed because a descendant already changed
its session.
The library reports the mechanism it actually got — mechanism() returns
JobObject, CgroupV2, or ProcessGroup — so you can verify the guarantee
rather than assume it.
Getting started
[dependencies]
processkit = "1"
A tokio runtime is required. Requires Rust 1.88 or later (MSRV). The crate is stable at 1.0; breaking changes land only in a new major version.
The simplest case — run a command, get its trimmed stdout, fail on error:
use processkit::Command; #[tokio::main] async fn main() -> processkit::Result<()> { let branch = Command::new("git") .args(["branch", "--show-current"]) .run() .await?; println!("on branch: {branch}"); Ok(()) }
When you need more than "success or error" — the exit code, both streams,
whether the run timed out — output_string() returns the full result without
raising on a non-zero exit:
use processkit::Command; #[tokio::main] async fn main() -> processkit::Result<()> { let result = Command::new("cargo").arg("test").output_string().await?; if result.timed_out() { eprintln!("tests hung; partial output:\n{}", result.stdout()); } else if !result.is_success() { eprintln!("exit {}: {}", result.code().unwrap(), result.stderr()); } Ok(()) }
The key design choice: a non-zero exit is captured data until you explicitly ask for success. Timeouts are captured in the result. Only cancellation is always an error — because an abandoned run has no result worth inspecting.
Feature flags
Each flag is additive. The kill-on-drop guarantee is unconditional in every configuration.
| Feature | Default | Adds |
|---|---|---|
process-control | ✅ | Signals, suspend/resume, members(), adopt() |
stats | — | Resource measurement: CPU time, peak memory, sample_stats, profile |
limits | — | Whole-tree resource caps (implies stats) |
record | — | Record/replay cassettes (adds serde) |
mock | — | mockall-generated MockRunner (semver-exempt surface) |
tracing | — | Lifecycle events via the tracing crate (never logs argv/env values) |
How it compares
| whole-tree kill-on-drop | async | limits / stats | streaming · pipelines · supervision | |
|---|---|---|---|---|
std::process | — | — | — | — |
tokio::process | — | ✓ | — | — |
command-group | ✓ | ✓ | — | — |
async-process | — | ✓ (smol) | — | — |
duct | — | — | — | pipelines only |
| processkit | ✓ | ✓ (tokio) | ✓ | ✓ |
The first column is the differentiator: descendants are contained and reaped as a unit, not just the direct child.
Consuming verbs
Every run begins with the same Command builder; the verb you end with
determines what you receive:
| What you want | Verb | What you get |
|---|---|---|
| stdout, success required | run() | trimmed String; non-zero / timeout / kill → typed error |
| full outcome, exit as data | output_string() | ProcessResult — code, stdout, stderr, timed_out |
| just the exit code | exit_code() | i32; a timed-out run errors instead of returning -1 |
| a yes/no answer | probe() | bool — 0 → true, 1 → false, anything else errors |
| a typed value from stdout | parse(|s| …) | T, success required |
| typed value, non-zero ok | try_parse(|s| …) | Option<T> — None on non-zero |
| first matching output line | first_line(|l| …) | Option<String> |
| a live handle for streaming | start() | RunningProcess |
The same vocabulary is available on every layer: ProcessRunner,
ProcessGroup, CliClient.
Streaming and interactive I/O
For commands that produce large or incremental output, start() returns a live
handle you drive yourself. Stream stdout line by line as the child produces it,
with no buffering and no waiting for exit:
use processkit::{Command, StreamExt, Finished, Outcome}; #[tokio::main] async fn main() -> processkit::Result<()> { let mut run = Command::new("cargo") .args(["build", "--release"]) .start() .await?; let mut lines = run.stdout_lines()?; while let Some(line) = lines.next().await { println!("{line}"); } // Stderr was drained in the background the whole time. let Finished { outcome, stderr, .. } = run.finish().await?; if outcome != Outcome::Exited(0) { eprintln!("build failed:\n{stderr}"); } Ok(()) }
For conversational tools — send a request, read the response, repeat —
keep_stdin_open() gives you an async writer you can interleave with reads.
The library handles the background stderr drain so the child can never block
on a full pipe while you're busy with stdout.
Readiness probes solve "start a server, then use it" without guessing at an arbitrary sleep:
use processkit::Command; use std::time::Duration; #[tokio::main] async fn main() -> processkit::Result<()> { let mut run = Command::new("my-server").start().await?; // Wait for the startup banner on stdout: run.wait_for_line(|l| l.contains("listening on"), Duration::from_secs(10)) .await?; // Or wait for a TCP port to accept connections: run.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10)) .await?; Ok(()) }
A probe that cannot pass — the child exited, or the deadline elapsed — fails
with Error::NotReady and does not kill the child. You decide what to do next.
Shell-free pipelines
a | b | c without a shell string. Stages are connected in-process through a
relay, so there are no quoting rules, no word-splitting, and no injection
surface. All stages share one kill-on-drop group.
use processkit::Command; #[tokio::main] async fn main() -> processkit::Result<()> { let authors = (Command::new("git").args(["log", "--format=%an"]) | Command::new("sort") | Command::new("uniq").arg("-c")) .run() .await?; println!("{authors}"); Ok(()) }
The outcome follows pipefail semantics: stdout is always the last stage's
output, but the exit code, stderr, and program name are attributed to the
first stage that failed. A stage that legitimately stops reading early — the
classic producer | head -n1 shape — can be marked .unchecked_in_pipe() so
its broken-pipe exit is not counted as a failure.
Timeouts, retries, and cancellation
Command::timeout(d) kills the whole process tree at the deadline. For the
one-shot capture verbs the expiry is part of the result; for the
success-checking verbs it becomes a typed Error::Timeout that carries the
partial output captured before the kill — useful for diagnosing what a hung
tool's last words were.
Command::retry(attempts, backoff, classifier) replays the run on transient
failure. The classifier sees the typed error — you can match on the exit code,
an Error::Timeout, or the captured stderr. A cancelled run is never retried:
the token stays cancelled.
CancellationToken (re-exported from tokio-util) is the coordinated
shutdown primitive. Wire the same parent token into many jobs via child tokens;
cancelling the parent kills every process tree and every consuming path reports
Error::Cancelled.
use processkit::{CancellationToken, Command}; use std::time::Duration; #[tokio::main] async fn main() -> processkit::Result<()> { let shutdown = CancellationToken::new(); let job = tokio::spawn({ let token = shutdown.child_token(); async move { Command::new("long-job") .timeout(Duration::from_secs(30)) .cancel_on(token) .run() .await } }); // Signal from anywhere — Ctrl-C, sibling failure, UI button: shutdown.cancel(); Ok(()) }
Keeping a service alive
Where retry answers "replay this one operation until it succeeds," a
Supervisor answers "keep this running." It restarts the command on exit per
policy, with bounded restarts, exponential backoff, and per-default jitter so a
restarted fleet doesn't pile back in lockstep:
use processkit::{Command, RestartPolicy, Supervisor}; use std::time::Duration; #[tokio::main] async fn main() -> processkit::Result<()> { let outcome = Supervisor::new(Command::new("my-server").args(["--port", "8080"])) .restart(RestartPolicy::OnCrash) .max_restarts(10) .backoff(Duration::from_millis(200), 2.0) .storm_pause(Duration::from_secs(15)) // crash-loop guard .run() .await?; println!("ended after {} restarts: {:?}", outcome.restarts, outcome.stopped); Ok(()) }
The optional storm guard distinguishes "fails occasionally" from "crash-looping": it maintains a half-life score that grows with each failure and decays between them. When the score exceeds a threshold, the supervisor takes one collective pause rather than hammering the restart timer at backoff speed.
Resource limits
With the limits feature, a ProcessGroup can cap the whole tree's memory,
process count, and CPU at creation time — enforced by the same kernel object that
provides kill-on-drop:
use processkit::{Command, ProcessGroup, ProcessGroupOptions}; #[tokio::main] async fn main() -> processkit::Result<()> { let group = ProcessGroup::with_options( ProcessGroupOptions::default() .memory_max(512 * 1024 * 1024) // 512 MiB across the whole tree .max_processes(64) .cpu_quota(0.5), // half of one core )?; let _job = group.start(&Command::new("untrusted-tool")).await?; Ok(()) }
A limit that cannot be enforced — no cgroup delegation, no Job Object — is a hard error at group creation time, not a silently unapplied cap. An unapplied cap is no protection.
Testing code that shells out
Subprocess behavior is notoriously difficult to test. ProcessKit exposes a
single trait — ProcessRunner — that decouples "what to run" from "how to
run it." Production code takes a runner generically; tests inject a double.
#![allow(unused)] fn main() { use processkit::{Command, ProcessRunner, ProcessRunnerExt, Result}; async fn current_branch(runner: &impl ProcessRunner) -> Result<String> { runner.run(&Command::new("git").args(["branch", "--show-current"])).await } }
The ScriptedRunner returns canned replies for matched commands. The
RecordingRunner captures every invocation for assertion. With the record
feature, RecordReplayRunner records real runs to a JSON cassette and replays
them in CI — fast, hermetic, byte-stable, no subprocess. The seam covers
streaming too: a scripted start() feeds canned lines through the same pump
machinery the real child uses, so stdout_lines, wait_for_line, and finish
all behave identically in tests.
The cli_client! macro generates typed wrappers around external tools (git,
gh, kubectl, …) that are injectable for free:
#![allow(unused)] fn main() { use processkit::{cli_client, ProcessRunner, Result}; use std::path::Path; cli_client!(pub struct Git => "git"); impl<R: ProcessRunner> Git<R> { pub async fn head(&self, dir: &Path) -> Result<String> { self.core.run(self.core.command_in(dir, ["rev-parse", "HEAD"])).await } } // In production: Git::new().head(Path::new(".")).await // In tests: Git::with_runner(ScriptedRunner::new().on([…], Reply::ok("abc\n"))) }
Guides
The Cookbook maps "I want to do X" directly to a working snippet — the fastest way in. The individual guides go deeper on each topic:
| Guide | Covers |
|---|---|
| Cookbook | Task-to-snippet recipes for every capability |
| Running commands | The full Command builder, every verb, error semantics |
| Process groups | Containment, teardown, signals, suspend/resume, limits, stats |
| Streaming & interactive I/O | Line streaming, interactive stdin, readiness probes, profiling |
| Pipelines | Shell-free chains, pipefail attribution, chain timeouts |
| Timeouts, retries & cancellation | Deadlines, retry classifiers, CancellationToken |
| Supervision | Restart policies, backoff & jitter, storm guard, outcomes |
| Testing your code | ProcessRunner seam, scripted/recording/cassette doubles, CliClient |
| Platform support | Mechanisms, capability matrices, platform caveats |
| Upgrading | Per-version migration notes |
API reference: docs.rs/processkit.
What's next
ProcessKit is a Rust library today, published as processkit on crates.io. The plan is to bring the same approach — kernel-backed whole-tree containment, honest error semantics, and testable seams — to other ecosystems: a Go package, an F# library, a Kotlin library, and a Python wrapper. Each implementation will follow the same philosophy and be documented here as it ships.
A note on development. This project was built with significant assistance from AI tools throughout the design and implementation process. That said, every line of code was read, understood, and deliberately chosen — this is not generated output dropped into a repository unchecked. The author takes full responsibility for correctness, API design, and the published result.
Cookbook
Task-oriented recipes: find the thing you're trying to do, copy the snippet,
follow the link when you need the fine print. Every snippet assumes a tokio
runtime and use processkit::Command; unless shown otherwise.
- Run a command and get its output
- Inspect a failure instead of erroring
- Ask a yes/no question
- Accept non-zero exit codes as success
- Bound a run with a timeout
- Let a tool clean up on timeout
- Show a useful error message
- Feed the child's stdin
- Stream output as it arrives
- Talk to an interactive child
- Pipe commands without a shell
- Start a server and wait until it's ready
- Tear down several children as a unit
- React to whichever child exits first
- Sandbox an untrusted tool
- Keep a crash-prone service running
- Retry a flaky command
- Cancel runs on shutdown
- Measure what a run cost
- Contain a process you didn't spawn
- Test code that runs processes — without processes
- Test streaming code — without processes
- Wrap a CLI tool behind a typed API
Run a command and get its output
#![allow(unused)] fn main() { let head = Command::new("git").args(["rev-parse", "HEAD"]).run().await?; }
run() requires a zero exit and returns stdout with trailing whitespace
trimmed; a non-zero exit, spawn failure, or timeout is a typed Error. For a
one-liner without the builder: processkit::run("git", ["rev-parse", "HEAD"]).
Fine print: Running commands → consuming verbs.
Inspect a failure instead of erroring
#![allow(unused)] fn main() { let result = Command::new("git").args(["merge", "topic"]).output_string().await?; if !result.is_success() { eprintln!("merge exited {:?}: {}", result.code(), result.stderr()); } }
output_string() (and output_bytes() for raw bytes) treats the exit code as
data — Err means the run couldn't happen at all. Call
result.ensure_success()? later to convert a stored failure into the same
typed error run() would have produced.
Fine print: Running commands → results and errors.
Ask a yes/no question
#![allow(unused)] fn main() { let dirty = !Command::new("git").args(["diff", "--quiet"]).probe().await?; }
probe() maps exit 0 → true, exit 1 → false, and anything else to an
error — the git diff --quiet / grep -q convention without manual code
matching.
Accept non-zero exit codes as success
#![allow(unused)] fn main() { // `grep` exits 1 when it finds no match — not a failure for this call. let found = Command::new("grep") .args(["needle", "haystack.txt"]) .ok_codes([0, 1]) .output_string() .await?; let matched = found.code() == Some(0); // 0 = matched, 1 = no match (both "success") }
ok_codes widens what the checking verbs (run/run_unit) and
is_success/ensure_success treat as success — for tools whose non-zero exit is a
normal result (grep 1 = no match, diff 1 = differs, rsync's code families). It
does not change exit_code (always the raw code) or probe (always the 0/1
convention). An empty set is ignored, so the default stays exit 0.
Bound a run with a timeout
#![allow(unused)] fn main() { use std::time::Duration; let result = Command::new("slow-tool") .timeout(Duration::from_secs(30)) .output_string() .await?; if result.timed_out() { eprintln!("gave up after 30s; partial output: {}", result.stdout()); } }
At the deadline the whole tree is killed. On the capture verbs the timeout is
captured (timed_out(), partial output kept); on the success-checking verbs
(run, exit_code) it surfaces as Error::Timeout.
Let a tool clean up on timeout
#![allow(unused)] fn main() { use std::time::Duration; let result = Command::new("dev-server") .timeout(Duration::from_secs(30)) .timeout_grace(Duration::from_secs(5)) // SIGTERM, wait up to 5s, then SIGKILL .output_string() .await?; }
timeout_grace turns the hard deadline kill into a graceful one: SIGTERM (or the
signal from timeout_signal, with the process-control feature), up to the grace
window to exit, then SIGKILL. A signal-handling child exits early; timed_out()
stays true. Windows has no signal tier — the deadline kills atomically.
Fine print: Timeouts → graceful timeout.
Show a useful error message
#![allow(unused)] fn main() { if let Err(e) = Command::new("git").args(["merge", "topic"]).run().await { eprintln!("merge failed: {}", e.diagnostic().unwrap_or("(no output)")); } }
Error::diagnostic() picks the most explanatory captured text — stderr,
falling back to stdout (git writes CONFLICT … there) — so callers don't
re-implement the same heuristic.
Feed the child's stdin
#![allow(unused)] fn main() { use processkit::Stdin; // A string you already have: let sorted = Command::new("sort") .stdin(Stdin::from_string("banana\napple\n")) .run() .await?; // …or any async source: a reader (file, socket) or a stream of lines. let from_file = Stdin::from_reader(tokio::fs::File::open("input.txt").await?); let from_chan = Stdin::from_lines(tokio_stream::iter(vec!["one".to_owned()])); }
One-shot sources (from_reader/from_lines) feed a single run; re-running the
same Command afterwards fails loud (an Error::Io at launch, D10) instead
of silently seeing empty stdin. For a conversation, see the next recipe but one.
Fine print: Running commands → standard input.
Stream output as it arrives
#![allow(unused)] fn main() { use processkit::{StreamExt, Finished}; // StreamExt re-exported; provides `.next()` let mut run = Command::new("cargo").args(["build", "--verbose"]).start().await?; let mut lines = run.stdout_lines()?; while let Some(line) = lines.next().await { println!("build: {line}"); } let Finished { outcome, stderr, .. } = run.finish().await?; // outcome + buffered stderr }
No waiting for exit, no full-output buffering; stderr is drained in the
background so the child can't block. A timeout on the command bounds the
stream itself. Prefer a callback? .on_stdout_line(|l| …) runs one per line
while any capture verb drives the run.
Fine print: Streaming & interactive I/O.
Talk to an interactive child
#![allow(unused)] fn main() { use processkit::StreamExt; let mut run = Command::new("bc").keep_stdin_open().start().await?; let mut stdin = run.take_stdin().expect("stdin was kept open"); stdin.write_line("2 + 2").await?; stdin.finish().await?; // EOF — bc exits let mut answers = run.stdout_lines()?; while let Some(answer) = answers.next().await { println!("{answer}"); } }
keep_stdin_open() hands you an async writer instead of closing stdin at
spawn; interleave writes with reads for request/response tools. Its writer
methods return std::io::Result (idiomatic for a writer) — convert with
.map_err(processkit::Error::Io)? in a processkit::Result function, or use
Box<dyn std::error::Error>.
Fine print: Streaming & interactive I/O → interactive stdin.
Pipe commands without a shell
#![allow(unused)] fn main() { let authors = Command::new("git").args(["log", "--format=%an"]) .pipe(Command::new("sort")) .pipe(Command::new("uniq").arg("-c")) .output_string() .await?; }
Native pipes — no shell string, no quoting, no injection surface. The outcome
is pipefail: stdout comes from the last stage, the reported failure from
the first stage that didn't exit cleanly. All stages share one kill-on-drop
group. The | operator is equivalent sugar:
(a | b | c).output_string().
For a consumer that legitimately stops reading early — the | head -1 shape,
where the producer's broken-pipe death (its next write fails once the downstream
closes, or SIGPIPE where the OS delivers it) is expected — mark the producer
unchecked_in_pipe() so that death doesn't fail the chain:
#![allow(unused)] fn main() { let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe() | Command::new("head").args(["-n", "1"])) .run() .await?; }
Fine print: Pipelines → unchecked stages.
Start a server and wait until it's ready
#![allow(unused)] fn main() { use std::time::Duration; let mut server = Command::new("my-server").args(["--port", "8080"]).start().await?; // Pick the probe that matches how the server announces readiness: server.wait_for_line(|l| l.contains("listening"), Duration::from_secs(10)).await?; // server.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10)).await?; // server.wait_for(|| async { http_health().await }, Duration::from_secs(10)).await?; // …use the server; dropping `server` kills its whole tree. }
A probe that can't succeed fails fast with Error::NotReady and never kills
the child — you decide what happens next. No more sleep(2) and hoping.
Fine print: Streaming & interactive I/O → readiness probes.
Tear down several children as a unit
#![allow(unused)] fn main() { use processkit::ProcessGroup; let group = ProcessGroup::new()?; let _db = group.start(&Command::new("dev-db")).await?; let _api = group.start(&Command::new("dev-api")).await?; // Either: graceful — SIGTERM, bounded wait, optional SIGKILL escalation… group.shutdown().await?; // …or just drop(group): hard kill-on-drop of everything, grandchildren included. }
The group is the unit of fate: a panic or early return anywhere reaps every
member. Configure the grace window via ProcessGroupOptions.
Fine print: Process groups.
React to whichever child exits first
#![allow(unused)] fn main() { use processkit::{ProcessGroup, wait_any}; let group = ProcessGroup::new()?; let mut a = group.start(&Command::new("worker-a")).await?; let mut b = group.start(&Command::new("worker-b")).await?; let (idx, outcome) = wait_any(&mut [&mut a, &mut b]).await?; println!("worker #{idx} exited first with {outcome:?}"); // `a` and `b` are only borrowed — the loser is still usable here. }
Fine print: Streaming & interactive I/O → racing children.
Sandbox an untrusted tool
#![allow(unused)] fn main() { use processkit::{ProcessGroup, ProcessGroupOptions}; // Cap the whole tree (requires the `limits` feature; Windows Job / Linux cgroup): let group = ProcessGroup::with_options( ProcessGroupOptions::default() .memory_max(512 * 1024 * 1024) .max_processes(64) .cpu_quota(0.5), )?; let result = group .start( &Command::new("untrusted-tool") .inherit_env(["PATH"]) // allow-list: everything else is cleared .timeout(std::time::Duration::from_secs(60)), ) .await? .output_string() .await?; }
Unenforceable limits are a hard Error::ResourceLimit, never a silently
unbounded group. On Unix, add .uid(…)/.gid(…) to drop privileges (note the
cgroup-mechanism caveat in the guide).
Fine print: Process groups → resource limits · Running commands → privileges.
Keep a crash-prone service running
#![allow(unused)] fn main() { use processkit::{RestartPolicy, Supervisor}; use std::time::Duration; let outcome = Supervisor::new(Command::new("my-service")) .restart(RestartPolicy::OnCrash) .max_restarts(5) .backoff(Duration::from_millis(200), 2.0) .storm_pause(Duration::from_secs(15)) // crash-loop guard (off by default) .run() .await?; println!( "stopped after {} restarts ({} storm pauses): {:?}", outcome.restarts, outcome.storm_pauses, outcome.stopped ); }
Exponential backoff with jitter by default; stop_when(…) ends supervision on
a condition; .with_runner(&group) keeps every incarnation inside one shared
kill-on-drop group. storm_pause arms the failure-storm guard: failures feed
a decaying score, and past the threshold the supervisor takes one collective
pause instead of hammering restarts — "fails rarely" and "crash-looping" stop
being the same case.
Fine print: Supervision, failure storms.
Retry a flaky command
#![allow(unused)] fn main() { use processkit::Error; use std::time::Duration; let fetched = Command::new("git") .args(["fetch", "--quiet"]) .timeout(Duration::from_secs(10)) .retry(3, Duration::from_millis(200), |e| { matches!(e, Error::Timeout { .. }) || e.diagnostic().is_some_and(|m| m.contains("Could not resolve host")) }) .run() .await?; }
The classifier sees the typed error and decides whether this failure is worth
another attempt; each attempt is a fresh process. retry replays a run to
success — for keeping a process alive, use a Supervisor (previous recipe).
Fine print: Timeouts, retries & cancellation → retry.
Cancel runs on shutdown
#![allow(unused)] fn main() { use processkit::CancellationToken; let token = CancellationToken::new(); let job = tokio::spawn({ let token = token.child_token(); async move { Command::new("long-job").cancel_on(token).run().await } }); // On Ctrl-C / shutdown signal / sibling failure: token.cancel(); // kills the tree; the run resolves to Error::Cancelled let outcome = job.await; // Err(Error::Cancelled { .. }) inside }
Cancellation is always an error (the run was abandoned, there is no result),
beats a simultaneous timeout, and is terminal for retry and Supervisor
alike.
For a typed wrapper whose commands never cross your code, set the token once on the client — every command it builds carries it:
#![allow(unused)] fn main() { use processkit::{CancellationToken, CliClient}; let token = CancellationToken::new(); let gh = CliClient::new("gh").default_cancel_on(token.child_token()); // token.cancel() → every in-flight command of THIS client dies. }
Fine print: Timeouts, retries & cancellation → cancellation, client-level default.
Measure what a run cost
#![allow(unused)] fn main() { use std::time::Duration; // One run, summarized (requires the opt-in `stats` feature): let profile = Command::new("crunch").start().await?.profile(Duration::from_millis(100)).await?; println!("exit={:?} took={:?} peak_rss={:?} avg_cpu={:?}", profile.exit_code, profile.duration, profile.peak_memory_bytes, profile.avg_cpu()); }
For a live series over a whole group, group.sample_stats(every) yields a
Stream of snapshots. CPU/memory need a real container (Windows Job / Linux
cgroup); elsewhere you still get process counts.
Fine print: Process groups → stats · Streaming → profiling.
Contain a process you didn't spawn
#![allow(unused)] fn main() { use processkit::ProcessGroup; let child = tokio::process::Command::new("legacy-launcher").spawn()?; let group = ProcessGroup::new()?; // `adopt` is part of `process-control` (default-on) group.adopt(&child)?; // from now on the group's teardown covers it }
Adoption is best-effort by mechanism — on Windows/cgroup the whole running tree joins; on the POSIX process-group backends an exec'd child is contained individually (its future forks too, where it could be re-grouped). The guide spells out exactly what each mechanism can promise.
Fine print: Process groups → adopt · Platform support.
Test code that runs processes — without processes
#![allow(unused)] fn main() { use processkit::testing::{Reply, ScriptedRunner}; // Your code takes any `R: ProcessRunner`; in tests, hand it a script. // Rules match on a prefix of the *program name followed by its arguments* // (the first element is the program): let runner = ScriptedRunner::new() .on(["git", "rev-parse"], Reply::ok("abc123\n")) .on(["git", "push"], Reply::fail(128, "remote: permission denied")) .fallback(Reply::ok("")); // my_deploy(&runner).await? — no subprocess, fully deterministic. }
RecordingRunner wraps any runner and captures every Invocation for
assertions; MockRunner (feature mock) gives mockall expectations; and
the record feature's RecordReplayRunner records real runs into a JSON
cassette once and replays them hermetically in CI.
Fine print: Testing your code.
Test streaming code — without processes
#![allow(unused)] fn main() { use processkit::{Command, Outcome, Finished}; use processkit::testing::{Reply, ScriptedRunner}; use std::time::Duration; let runner = ScriptedRunner::new() .on(["gh", "run", "watch"], Reply::lines(["queued", "in_progress", "completed"]) .with_line_delay(Duration::from_millis(50))); // paced delivery let mut run = runner.start(&Command::new("gh").args(["run", "watch", "123"])).await?; run.wait_for_line(|l| l.contains("completed"), Duration::from_secs(5)).await?; let Finished { outcome, .. } = run.finish().await?; assert_eq!(outcome, Outcome::Exited(0)); }
A scripted start() feeds the canned lines through the same pump
machinery a real child uses, so stdout_lines, the readiness probes, and
finish behave identically — and with_line_delay is deterministic
under #[tokio::test(start_paused = true)]. Canned output also replays
through on_stdout_line/on_stderr_line handlers on the bulk verbs, so
progress-reporting paths test hermetically too.
Fine print: Testing → scripted streaming.
Wrap a CLI tool behind a typed API
#![allow(unused)] fn main() { use processkit::{cli_client, ProcessRunner, Result}; cli_client!(pub struct Git => "git"); impl<R: ProcessRunner> Git<R> { pub async fn current_branch(&self) -> Result<String> { // A verb takes the args directly; pass a built `command(..)` only // when you need to customize it (per-call timeout, stdin, …). self.core.run(["branch", "--show-current"]).await } pub async fn is_clean(&self) -> Result<bool> { self.core.probe(["diff", "--quiet"]).await } } }
The generated struct carries a runner and per-client defaults
(default_timeout, default_env); your methods are just argument lists and
parsers — and because the runner is injectable, the whole wrapper is testable
with the previous recipe's ScriptedRunner.
Fine print: Testing your code → CliClient.
Running commands
Command is the entry point of the runner layer: a builder describing what
to run and how, plus a family of consuming verbs that decide what you get
back. Every one-shot verb spawns the child into a fresh, private kill-on-drop
process group, so an early return, panic, or dropped
future can never leak a process tree.
- Program, arguments, working directory
- Environment
- Standard input
- Output handling
- Timeouts and retries
- Privileges and spawn flags
- Consuming verbs
- Results and errors
Program, arguments, working directory
#![allow(unused)] fn main() { use processkit::Command; let out = Command::new("git") .arg("log") // one at a time… .args(["--oneline", "-n", "10"]) // …or in bulk .current_dir("/path/to/repo") // run there .run() .await?; }
Arguments are passed as an array — there is no shell between you and the
child, so there is no quoting, no word-splitting, and no injection surface.
(When you actually want a | b | c, use a pipeline, which
connects the stages in-process instead of invoking a shell.)
The program name reaches the OS verbatim — two deliberate non-goals
(conveniences some libraries layer on, e.g. duct): a bare name is resolved
on PATH by the OS, never rewritten to ./name; and current_dir does not
re-anchor a relative program path against the new directory — whether
Command::new("./tool").current_dir(dir) resolves tool relative to dir
is the platform's behavior (Unix: yes; Windows: the parent's directory may
win). Pass absolute program paths when combining the two.
For quick one-liners the free functions skip the builder:
#![allow(unused)] fn main() { let version = processkit::run("cargo", ["--version"]).await?; // trimmed stdout, success required let result = processkit::output_string("git", ["status", "-s"]).await?; // full ProcessResult }
Environment
Four builders compose, applied in a fixed order at spawn:
#![allow(unused)] fn main() { use processkit::Command; Command::new("worker") .env("RUST_LOG", "debug") // set one variable .env_remove("GIT_DIR") // unset one inherited variable .run().await?; // Allow-list mode: clear everything, copy only the named parent variables. Command::new("sandboxed-tool") .inherit_env(["PATH", "HOME", "LANG"]) .env("MODE", "ci") // explicit env/env_remove still apply on top .run().await?; // Scorched earth: the child starts with an empty environment. Command::new("hermetic-tool").env_clear().run().await?; }
inherit_env is the sandboxing middle ground: it implies env_clear, then
copies the listed variables from the parent at each spawn (so a retry sees
fresh values), and repeated calls accumulate names. A name the parent doesn't
have is skipped, not set to empty.
Standard input
By default stdin is closed at spawn — the child reads EOF immediately and
can never hang waiting for input. Everything else is opt-in via
stdin(Stdin::…):
| Source | Reusable on re-run? | Use for |
|---|---|---|
Stdin::empty() | — | The default, explicit |
Stdin::from_string("…") | ✅ | Text payloads |
Stdin::from_bytes(vec![…]) | ✅ | Binary payloads |
Stdin::from_iter_lines(["a", "b"]) | ✅ | Anything iterable; each item is written \n-terminated |
Stdin::from_file(path) | ✅ (re-opened per run) | Large inputs streamed from disk |
Stdin::from_reader(reader) | ❌ one-shot | Any AsyncRead — a socket, a decompressor, … |
Stdin::from_lines(stream) | ❌ one-shot | Any Stream<Item = String> — a channel, a tail, … |
#![allow(unused)] fn main() { use processkit::{Command, Stdin}; let sorted = Command::new("sort") .stdin(Stdin::from_iter_lines(["banana", "apple", "cherry"])) .run() .await?; assert_eq!(sorted, "apple\nbanana\ncherry"); }
The payload is written on a background task (so a large input can't deadlock
against the child's output) and the pipe is dropped afterwards to signal EOF.
The two one-shot sources are consumed by their first run: a retried or
cloned command reusing them fails loud the second time — re-running a
consumed from_reader/from_lines source is an Error::Io (InvalidInput)
at launch, not a silent empty stdin. Prefer the reusable sources when
a command may run more than once.
For conversational, request/response stdin — write a line, read the answer,
repeat — use keep_stdin_open() and the streaming API instead: see
Streaming & interactive I/O.
Output handling
Encodings
Output is decoded line by line, UTF-8 by default (invalid bytes become
U+FFFD, never an error). Legacy-encoding tools can override per stream:
#![allow(unused)] fn main() { use processkit::Command; let out = Command::new("legacy-tool") .encoding(encoding_rs::SHIFT_JIS) // both streams… // .stdout_encoding(…) / .stderr_encoding(…) // …or each its own .output_string() .await?; }
(processkit::Encoding re-exports encoding_rs::Encoding, so any of its
encodings works — the single-byte and ASCII-compatible multibyte ones
(WINDOWS_1252, GBK, SHIFT_JIS, …) and the non-ASCII-compatible ones
(UTF_16LE/UTF_16BE): output is fed through one persistent decoder and split
on decoded newlines, so a 0x0A byte inside a UTF-16 code unit is not mistaken
for a line break. A leading byte-order mark of the chosen encoding is stripped
once at the stream start.)
Buffer policies — bounding memory on chatty children
Captured lines are held in memory; a multi-gigabyte log would normally grow
the buffer to match. output_buffer bounds retention (the pipe is always
fully drained, so the child never blocks):
#![allow(unused)] fn main() { use processkit::{Command, OutputBufferPolicy, OverflowMode}; let tail = Command::new("verbose-build") .output_buffer(OutputBufferPolicy::bounded(1_000)) // keep the newest 1000 lines .output_string() .await?; // …or keep the head instead of the tail: let head_policy = OutputBufferPolicy::bounded(1_000).with_overflow(OverflowMode::DropNewest); }
DropOldest (the default) keeps a rolling tail; DropNewest freezes the
head. bounded(0) retains nothing — useful when a line handler (below) is the
real consumer. Under a line cap, dropped or not, every line still feeds
the handlers and the line counters.
The line cap alone does not bound memory — one enormous newline-free "line"
(base64 -w0) is held whole. Add with_max_bytes to cap the retained bytes
too (either ceiling, or both); the byte cap also bounds the pump's in-flight
assembly buffer, so a never-terminated flood can't exhaust memory. One
consequence: a line whose own length exceeds the byte cap can't be assembled, so
it is dropped whole — counted, but not delivered to a per-line handler or
stdout_tee (don't set a byte cap if a tee must see arbitrarily long lines):
#![allow(unused)] fn main() { use processkit::{Command, OutputBufferPolicy}; let policy = OutputBufferPolicy::unbounded().with_max_bytes(8 << 20); // 8 MiB ring let strict = OutputBufferPolicy::fail_loud(10_000).with_max_bytes(8 << 20); // error on either }
fail_loud makes the ceiling error instead of dropping: the run fails with
Error::OutputTooLarge once the cumulative output (lines or bytes) crosses the
cap — even when a streaming consumer is draining lines as they arrive. It bounds
memory, not wall-time, so pair it with timeout against a flooding child.
Even under a drop policy (DropOldest/DropNewest), the checking verbs that
hand back stdout as if complete — run, parse, try_parse — refuse
silently-truncated output: if the policy dropped lines they fail with
Error::OutputTooLarge rather than feed a parser a truncated tail. The lenient
capture verbs (output_string / output_bytes) are unaffected — they return
the partial result with truncated() set for you to inspect.
Line handlers — tee output as it arrives
on_stdout_line / on_stderr_line run a callback on each decoded line in
addition to capture or streaming — logging, progress bars, metrics:
#![allow(unused)] fn main() { use processkit::Command; let result = Command::new("cargo") .args(["build", "--release"]) .on_stderr_line(|line| eprintln!("[build] {line}")) .output_string() .await?; }
The handler runs on the read pump — keep it cheap. The contract is forgiving and precisely specified:
- A panicking handler does not poison the run. The panic is caught, the
handler is disabled for the rest of the run (surfaced as a
tracingwarn when that feature is on), and pumping continues — the final result still carries every line. You can safely re-export this callback seam to your own users without auditing their closures. - Ordering: invocations are FIFO within a stream; there is no ordering between stdout and stderr handlers (two independent pumps). On the consuming verbs, all handler calls happen-before the awaited future resolves — finalize a progress bar the moment the call returns. (One documented exception: a leaked pipe held open past the child's death is cut off after a bounded teardown grace.)
- Handlers are hermetically testable:
ScriptedRunnerreplays canned output through them — see Testing → scripting replies.
For a ready-made tee to an async sink — a file, socket, or any
[tokio::io::AsyncWrite] — reach for stdout_tee / stderr_tee instead of
hand-writing a handler. Each decoded line is written to the sink (plus a \n)
as it is produced, awaited on the pump so a slow sink applies backpressure
(the pump slows, the pipe fills, the child blocks) rather than blocking the
runtime; a write error disables the tee with a tracing warn instead of being
swallowed. It runs independently of on_stdout_line — set both and both
fire per line.
Timeouts and retries
#![allow(unused)] fn main() { use processkit::{Command, Error}; use std::time::Duration; let out = Command::new("flaky-network-tool") .timeout(Duration::from_secs(30)) // kill the tree at the deadline .retry(3, Duration::from_millis(200), |e| { // up to 3 attempts total matches!(e, Error::Timeout { .. }) // …but only retry timeouts }) .run() .await?; }
timeoutkills the whole process tree at the deadline. On the capturing verbs the expiry is captured (ProcessResult::timed_out), on the success-checking verbs it raisesError::Timeout— the full decision table lives in Timeouts, retries & cancellation.retryapplies to the success-checking verbs only (run,exit_code,probe, andProcessRunnerExt::checked); the classifier sees the typed error and decides. The non-erroringoutput_stringpath never retries.
Privileges and spawn flags
Spawn-time controls for sandboxing and service launch:
#![allow(unused)] fn main() { use processkit::Command; // Unix: drop privileges (uid + gid + supplementary groups) and detach. Command::new("worker") .gid(1000) // applied before uid (a gid change needs privilege) .groups([1000]) // replace the inherited (often root's) supplementary groups .uid(1000) // dropped last .setsid() // new session: survives the controlling terminal .run().await?; // Windows: no console window flashing up from a GUI app. Command::new("helper").create_no_window().run().await?; // Hardening: take the direct child down even if THIS process is SIGKILLed // (Drop never runs). Windows has this for free; Linux arms PDEATHSIG. Command::new("worker").kill_on_parent_death().start().await?; }
uid / gid / groups / setsid are POSIX-only — on Windows the run
fails with Error::Unsupported rather than silently skipping a privilege drop.
A correct drop sets all three of uid/gid/groups: dropping the uid alone
leaves the child holding the parent's (often root's) supplementary groups.
create_no_window is a harmless no-op outside Windows.
kill_on_parent_death is best-effort by design: guaranteed on Windows
(regardless of the knob), direct-child-only on Linux, unavailable on
macOS/BSD — the graceful-exit guarantee via Drop holds everywhere either
way. Containment is preserved in every combination; the platform fine print
(the Linux cgroup × uid interaction, setsid × process-group coordination,
the pdeathsig thread caveat) is collected in
Platform support.
Interactive auth / TTY. processkit wires pipes, not a pseudo-terminal,
so a tool that demands a tty — an ssh/sudo password prompt, some
credential helpers — won't get one (PTY support is not implemented; the
trade-off is recorded in decisions/permissions-privileges-pty-network.md). Drive
such tools non-interactively instead: key-based auth, ssh -o BatchMode=yes, GIT_SSH_COMMAND / GIT_TERMINAL_PROMPT=0, or feed a known
answer over interactive stdin. Conversational
tools that read stdin without needing a tty already work today via
keep_stdin_open + stdout_lines.
Consuming verbs
| Verb | Returns | Non-zero exit | Timeout | Use when |
|---|---|---|---|---|
output_string() | ProcessResult<String> | captured | captured (timed_out) | You want to inspect the outcome yourself |
output_bytes() | ProcessResult<Vec<u8>> | captured | captured | Binary stdout (images, archives, …) |
run() | trimmed stdout String | Error::Exit | Error::Timeout | "Give me the answer or fail" |
exit_code() | i32 | the code, Ok | Error::Timeout | The code is the answer |
probe() | bool | 0→true, 1→false, else Error::Exit | Error::Timeout | Predicate commands: git diff --quiet, grep -q |
first_line(pred) | Option<String> | — (stream-based) | Error::Timeout | Grab one matching line, kill the rest |
start() | live RunningProcess | — | bounds the stream | Streaming, interactive I/O, probes |
#![allow(unused)] fn main() { use processkit::Command; // probe(): the exit code as a boolean. let clean = Command::new("git").args(["diff", "--quiet"]).probe().await?; // first_line(): stop as soon as the interesting line appears. let first_match = Command::new("git") .args(["log", "--oneline"]) .first_line(|l| l.contains("fix:")) .await?; }
first_line returns Ok(None) when stdout closes without a match, and kills
the (private-group) child once it has its answer — you never wait out a long
log for one line. If the command's cancel_on
token has fired, it returns Error::Cancelled instead of Ok(None), so a
readiness probe with a shutdown token can't misread cancellation as "the line
never appeared".
Results and errors
The capturing verbs hand back a ProcessResult:
#![allow(unused)] fn main() { use processkit::Command; let result = Command::new("git").args(["merge", "feature"]).output_string().await?; result.code(); // Option<i32> — None = killed (timeout/signal), no code result.signal(); // Option<i32> — the signal number (Unix), else None result.is_success(); // code in ok_codes (default {0}) result.timed_out(); // the run's own deadline expired result.outcome(); // the explicit three-way enum behind the accessors above result.stdout(); // &str (or &[u8] from output_bytes) result.stderr(); // &str result.combined(); // stdout + stderr concatenated result.diagnostic(); // stderr if non-empty, else stdout — the human-facing line // (git/jj put "CONFLICT …" on stdout!) // Opt into erroring whenever you're ready: let ok = result.ensure_success()?; // Exit / Timeout / Signalled (signal-kill) as typed errors }
When the three-way distinction matters, match on Outcome instead of
mentally decoding the code()/timed_out() pair:
#![allow(unused)] fn main() { use processkit::Outcome; match result.outcome() { Outcome::Exited(0) => println!("clean"), Outcome::Exited(code) => println!("failed with {code}"), Outcome::Signalled(signal) => println!("killed by signal {signal:?}"), Outcome::TimedOut => println!("hit its deadline"), _ => {} // non_exhaustive: future dispositions } }
For a single query you usually don't need the match (and its
#[non_exhaustive] wildcard): Outcome carries the same code() /
signal() / timed_out() accessors as ProcessResult, so a bare Outcome
(from RunningProcess::wait or Finished::outcome) answers directly —
outcome.code(), outcome.signal(), outcome.timed_out(). There is no
Outcome::is_success (success is ok_codes-aware — use
ProcessResult::is_success).
The error enum is structured and #[non_exhaustive]:
| Variant | Meaning |
|---|---|
Error::Spawn { program, source } | The program was located but the OS couldn't start it (permissions, a bad working directory, a Windows .cmd/.bat needing cmd.exe, …) — not is_not_found() |
Error::NotFound { program, searched } | The program couldn't be located (the single "not found" representation — is_not_found() is true); searched is Some(dirs) for a bare-name PATH lookup, None otherwise |
Error::Exit { program, code, stdout, stderr } | Non-zero exit, both streams attached in full (the Display message is bounded, but the fields carry the complete captured text for classification) |
Error::Signalled { program, signal, stdout, stderr } | The process was killed by a signal (no exit code); signal carries the number on Unix, None elsewhere; the partial streams captured before the kill are attached (reach them via diagnostic()) |
Error::OutputTooLarge { program, line_limit, byte_limit, total_lines, total_bytes } | A fail_loud buffer's line or byte ceiling was exceeded |
Error::Timeout { program, timeout, stdout, stderr } | The run's own deadline killed it; whatever the run captured before the kill is attached — a hung tool's last stderr line tails the Display and is reachable via diagnostic() |
Error::NotReady { program, timeout } | A readiness probe gave up |
Error::Parse { program, message } | A try_parse parser (on Command, ProcessRunnerExt, CliClient, or Pipeline) rejected the output (the Display/Debug of message is bounded to a 200-byte preview; the field carries the full text) |
Error::Stdin { program, source } | Feeding the child's stdin failed for a non-broken-pipe reason on an otherwise-successful run (a louder failure — exit/signal/timeout — wins instead); a routine broken pipe never surfaces |
Error::CassetteMiss { program } | (record feature) a cassette replay found no matching recording (stale/incomplete cassette) — kept distinct from a missing program, so is_not_found() is false |
Error::Unsupported { operation } | The platform can't do what was asked (and silently skipping would be wrong) |
Error::Cancelled { program } | the run's token was cancelled |
Error::ResourceLimit { message } | (limits feature) a requested cap couldn't be enforced |
Error::Io(source) | A low-level IO error from the crate's own machinery (driving a child, group control, cassette files) — never an arbitrary foreign io::Error (no blanket From) |
Error::diagnostic() returns the most useful human-facing line out of a
failure that captured output — Exit, Timeout, and Signalled (the
partial streams of a hung-then-killed or crashed tool). Each of those variants'
one-line Display also appends a bounded excerpt of that diagnostic (the last
non-empty line, capped at 200 bytes), so a bare eprintln!("{e}") reads
`git` exited with code 2: fatal: boom — actionable in a log line without
dumping multi-KiB streams into it.
Next: Streaming & interactive I/O · Timeouts, retries & cancellation · Process groups
Process groups
A ProcessGroup ties the lifetime of a whole child-process tree to a Rust
value: every process spawned into the group — and everything those processes
spawn — is killed when the group is dropped. An exiting, panicking, or
?-returning owner never leaks subprocesses; the kernel object enforcing this
(Job Object / cgroup / POSIX process group) catches even grandchildren you
never knew about. (Killing grandchildren is the problem duct.py's gotchas
list files under "currently unsolved" for pipe-based designs — kernel
containment is the solution, and the reason this crate exists.)
- Creating a group
- Putting processes in
- Tearing down: drop, terminate, shutdown
- Signalling the whole tree
- Suspending and resuming
- Listing members
- Resource limits
- Stats and sampling
Creating a group
#![allow(unused)] fn main() { use processkit::{ProcessGroup, ProcessGroupOptions}; use std::time::Duration; // Defaults: 2s graceful-shutdown grace, escalate to SIGKILL. let group = ProcessGroup::new()?; // Tuned: let group = ProcessGroup::with_options( ProcessGroupOptions::default() .shutdown_timeout(Duration::from_secs(10)) .escalate_to_kill(true), )?; // Which kernel mechanism is actually containing the tree? println!("{:?}", group.mechanism()); // JobObject | CgroupV2 | ProcessGroup }
mechanism() reports what you actually got: CgroupV2 quietly falls back to
ProcessGroup on Linux hosts without cgroup delegation (see
Platform support).
You rarely create a group explicitly for one-shot runs: every
Command::run()-style call makes a private group automatically. Reach for an
explicit group when several children should share one fate, or when you need
the group verbs below.
Putting processes in
Three doors, in order of preference:
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup}; let group = ProcessGroup::new()?; // 1. start(): the full Command experience (capture, streaming, timeouts) in a // SHARED group. The handle does not own the group — dropping the handle // kills that child, dropping the group kills everyone. let server = group.start(&Command::new("dev-server")).await?; // 2. spawn(): the raw escape hatch for a tokio::process::Command you already // have. You get the bare Child back; pipes and reaping are your problem. // spawn() takes the command BY VALUE (reuse would stack pre-exec hooks). let raw = tokio::process::Command::new("background-helper"); let child = group.spawn(raw)?; // 3. adopt(): contain a child that was spawned OUTSIDE the group. let external = tokio::process::Command::new("legacy-launcher").spawn()?; group.adopt(&external)?; }
adopt moves only the named process: descendants it already has keep their
old containment (future forks are captured — on Windows/cgroup). A few sharp
edges worth knowing:
- A child that already exited but has not been reaped (no
wait()yet — a zombie whose pid/handle is still valid) is a successful no-op: there is nothing left to contain, soadoptreturnsOkon the containment backends. - A child that already exited and was reaped (
wait()ed) has no pid left —adoptreturns an error rather than silently tracking nothing. - On the POSIX process-group mechanism, a child that has already
exec'd can't be re-grouped (POSIX forbids it), so it is tracked individually: the child itself is signalled/killed with the group, but its future forks are not. The caller keeps theChildhandle and is responsible for reaping.
Tearing down: drop, terminate, shutdown
| Verb | What happens | When |
|---|---|---|
drop(group) | Immediate hard kill of the whole tree (kill-on-close) | The safety net — always on |
group.terminate_all() | The same hard kill, group stays usable (cgroup-kill / Job Object / process-group backends). On a pre-5.14 Linux kernel lacking cgroup.kill, the per-pid SIGKILL fallback returns Err if the tree doesn't drain (a fork bomb still out-spawning, or D-state zombies) | Explicit teardown mid-flight; idempotent |
group.shutdown().await | Unix: SIGTERM → wait shutdown_timeout → SIGKILL survivors (if escalate_to_kill); Windows: atomic job kill. Consumes the group | Graceful service stop |
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup, ProcessGroupOptions}; use std::time::Duration; let group = ProcessGroup::with_options( ProcessGroupOptions::default() .shutdown_timeout(Duration::from_secs(5)) .escalate_to_kill(true), )?; let _service = group.start(&Command::new("my-service")).await?; // SIGTERM, give it 5s to flush and exit, SIGKILL stragglers: group.shutdown().await?; }
A child that handles SIGTERM ends the grace early — shutdown returns
as soon as the tree is empty, not after the full timeout. One subtlety: the
liveness probe sees an exited-but-unreaped child (a zombie) as alive on the
process-group backends, so keep wait()ing your handles concurrently if you
want the early return. Drop can't await, which is why the graceful tier
lives in this async method — dropping without calling it performs only the
hard kill.
Signalling the whole tree
signal/suspend/resume/members/adopt— this section and the two below — require the default-onprocess-controlfeature. The teardown verbs above are core and always present.
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup, Signal}; let group = ProcessGroup::new()?; let _server = group.start(&Command::new("my-server")).await?; group.signal(Signal::Hup)?; // "reload your configuration" group.signal(Signal::Usr1)?; // whatever the tool defines group.signal(Signal::Other(34))?; // raw signal number escape hatch }
| Platform | Deliverable signals |
|---|---|
| Linux (cgroup or pgroup), macOS/BSD | Any — Term, Kill, Int, Hup, Quit, Usr1, Usr2, Other(n) |
| Windows | Kill only (maps to the Job Object terminate); anything else → Error::Unsupported |
Signal::Kill always takes the same atomic whole-tree kill path as
terminate_all (cgroup.kill / killpg / job terminate), so it cannot miss
a process forked mid-broadcast. Other signals are a per-member broadcast —
best-effort against a tree that is forking at that exact moment. An empty
group accepts any deliverable signal trivially. On the cgroup mechanism a
real per-member delivery failure (e.g. EPERM from a member that changed uid, or
a seccomp/container restriction) is surfaced as an Err rather than swallowed —
an ESRCH race (the member already exited) is still success; the pgroup
(macOS/BSD, Linux-without-cgroup) backend remains purely best-effort.
Suspending and resuming
Freeze a tree (to snapshot it, to starve a runaway while you investigate, to pause background work), then thaw it:
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup}; let group = ProcessGroup::new()?; let _cruncher = group.start(&Command::new("cpu-hog")).await?; group.suspend()?; // the whole tree stops consuming CPU // … inspect, snapshot, wait for the user … group.resume()?; }
Per-platform machinery — and its visible differences:
| Platform | Mechanism | Notes |
|---|---|---|
| Linux cgroup | one cgroup.freeze write | Atomic over the subtree; freeze is group state |
| Linux pgroup, macOS/BSD | SIGSTOP / SIGCONT broadcast | Idempotent (level-triggered) |
| Windows | per-thread SuspendThread walk | Counted: N suspends need N resumes; best-effort against mid-walk thread churn |
Two caveats that bite in practice:
- Spawning into a suspended group diverges. Under the cgroup mechanism a
child spawned or adopted while the group is frozen starts frozen — and
start()may never return untilresume(the forked child joins the cgroup beforeexec, so it can freeze before completing the spawn handshake). Windows and the pgroup backends freeze only members present at the call. Rule of thumb: resume before starting new work. - A suspended tree can still be hard-killed (drop /
terminate_all/Signal::Killall act on frozen processes), but a gracefulshutdownstarts with aSIGTERMthe frozen tree can't act on — it would wait out the whole grace. Resume first for a clean shutdown.
Listing members
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup}; let group = ProcessGroup::new()?; let _a = group.start(&Command::new("worker-a")).await?; let _b = group.start(&Command::new("worker-b")).await?; let pids: Vec<u32> = group.members()?; println!("live members: {pids:?}"); }
What "members" means depends on the mechanism: Windows and Linux-cgroup list the whole tree (every descendant pid); the POSIX process-group backends list the tracked group leaders (one pid per started/adopted child) — their descendants are contained but not enumerated. An exited child still counts until it is reaped. The snapshot is point-in-time: a tree that is forking races it.
To wait on members rather than list them, race the handles with
wait_any.
Resource limits
Requires the limits feature. Caps are a property of the group, set once
at creation and enforced by the same kernel object that contains the tree:
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup, ProcessGroupOptions}; let group = ProcessGroup::with_options( ProcessGroupOptions::default() .memory_max(512 * 1024 * 1024) // bytes, whole tree .max_processes(64) // fork-bomb ceiling .cpu_quota(0.5), // half of one core )?; let _sandboxed = group.start(&Command::new("untrusted-tool")).await?; }
| Capability | Windows Job Object | Linux cgroup v2 | pgroup / macOS / BSD |
|---|---|---|---|
| Memory cap | ✅ whole-tree | ✅ whole-tree (memory.max) | ❌ |
| Process-count cap | ✅ | ✅ (pids.max) | ❌ |
| CPU quota | 🟡 approximate (rate vs. total CPU) | ✅ (cpu.max) | ❌ |
cpu_quota is a fraction of a single core (2.0 = two cores). Limits
need a real container; when a requested cap can't be enforced — no Job
Object/cgroup, or a Linux cgroup whose controllers can't be enabled —
with_options returns Error::ResourceLimit instead of handing back a
silently-unbounded group. On Linux this needs the process to run at the
real cgroup-v2 root: the crate enables the controllers in this process's own
cgroup, which cgroup v2's "no internal processes" rule allows only for the real
hierarchy root — not a cgroup-namespace root (so an ordinary container fails
too), not under systemd — and the crate doesn't migrate your process. See the
limits prerequisites in
Platform support. The uid()-drop
interaction lives under its Caveats.
Stats and sampling
Requires the opt-in stats feature (features = ["stats"], or limits).
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup, StreamExt}; use std::time::Duration; let group = ProcessGroup::new()?; let _worker = group.start(&Command::new("worker")).await?; // Point-in-time: let snap = group.stats()?; println!( "procs={} cpu={:?} peak_rss={:?}", snap.active_process_count, snap.total_cpu_time, snap.peak_memory_bytes, ); // …or a series: first sample immediate, then every 250ms; missed ticks are // skipped; the stream ends when the group can no longer report. let mut samples = group.sample_stats(Duration::from_millis(250)); while let Some(s) = samples.next().await { println!("rss now: {:?}", s.peak_memory_bytes); } }
CPU time and peak memory are available where the kernel accounts for the
whole tree (Windows, Linux cgroup); the process-group backends report the
member count only — the Option fields stay None. The sampler borrows
the group, so it can neither outlive it nor keep it (and the kill-on-drop
guarantee) alive. For a single run's end-to-end summary, see
profile.
Next: Streaming & interactive I/O · Platform support · Supervision
Streaming & interactive I/O
The one-shot verbs in Running commands buffer the whole output.
For long-running or conversational children, Command::start() returns a live
RunningProcess you drive yourself: stream stdout as it arrives, write stdin
incrementally, probe for readiness, race several children, or profile a run.
- Lifecycle
- Streaming stdout
- Interactive stdin
- Readiness probes
- Racing children with
wait_any - Per-run telemetry
Lifecycle
#![allow(unused)] fn main() { use processkit::Command; let mut run = Command::new("dev-server").start().await?; run.pid(); // Option<u32> — None once the child is reaped run.elapsed(); // time since spawn // Consume the handle exactly one way: // output_string() / output_bytes() → capture everything (same as the one-shot verbs) // wait() → just the Outcome; output is discarded // finish() → after streaming stdout (below) // profile(every) → capture + resource samples (stats feature) let outcome = run.wait().await?; // Outcome: Exited(code) / Signalled(sig) / TimedOut }
start() puts the child in a private group the handle owns: dropping the
RunningProcess kills the whole tree, exactly like dropping a one-shot run's
future. The shared-group variant — group.start(&cmd) — gives the same handle
but the group controls the tree's fate (see
Process groups).
There is also an explicit run.start_kill() for "stop it now, I'll wait()
for the code myself".
Streaming stdout
stdout_lines() yields decoded lines as the child produces them — no waiting
for exit, no full-output buffering. StreamExt (re-exported from
tokio-stream) provides .next():
use processkit::{Command, Outcome, StreamExt, Finished}; #[tokio::main] async fn main() -> processkit::Result<()> { let mut run = Command::new("cargo") .args(["build", "--release"]) .start() .await?; let mut lines = run.stdout_lines()?; while let Some(line) = lines.next().await { println!("build: {line}"); } // The stream ended (stdout closed). Collect the outcome and stderr — // stderr was drained in the background the whole time, so a noisy child // could never block on a full pipe. let Finished { outcome, stderr, .. } = run.finish().await?; if outcome != Outcome::Exited(0) { eprintln!("build failed ({outcome:?}):\n{stderr}"); } Ok(()) }
Things to know:
- Call
stdout_lines()once. It is fallible: a secondstdout_lines/output_eventscall (stdout is consumed once), or a non-piped stdout (StdioMode::Inherit/Null), returnsErrrather than a silently-empty stream. - The command's
timeoutbounds the stream on an own-group handle: at the deadline the tree is killed, the pipes close, and the stream ends — a streamed run can't hang past its deadline. Acancel_ontoken ends it the same way; the followingfinishthen reportsError::Cancelled. Details in Timeouts & cancellation. - Line counters tick live:
run.stdout_line_count()/stderr_line_count()are cheap progress gauges even while you stream. - The buffer policy and line handlers apply to streamed runs too — a handler sees each line on the pump, in addition to your loop.
- The whole streaming surface is hermetically testable: a
ScriptedRunner'sstart()returns a handle whose canned lines flow through the same pump machinery —stdout_lines, the readiness probes, andfinishbehave identically with no subprocess. See Testing → scripted streaming.
Interactive stdin
Conversational tools — write a request, read the response, repeat. Keep stdin
open with keep_stdin_open(), take the writer with take_stdin():
use processkit::{Command, Outcome, StreamExt, Finished}; // `ProcessStdin`'s writer methods return `std::io::Result`; `Box<dyn Error>` // mixes them with the crate's `Result` (or `.map_err(processkit::Error::Io)?`). #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // `bc` evaluates each stdin line and prints the result. let mut run = Command::new("bc").keep_stdin_open().start().await?; let mut stdin = run.take_stdin().expect("stdin was kept open"); let mut answers = run.stdout_lines()?; stdin.write_line("2 + 2").await?; // writes "2 + 2\n", flushed println!("= {}", answers.next().await.unwrap()); stdin.write_line("6 * 7").await?; println!("= {}", answers.next().await.unwrap()); stdin.finish().await?; // send EOF — bc exits let Finished { outcome, .. } = run.finish().await?; assert_eq!(outcome, Outcome::Exited(0)); Ok(()) }
ProcessStdin offers write(&[u8]), write_line(&str) (newline + flush),
flush(), and finish() (EOF). Dropping the writer — or the whole
RunningProcess — closes stdin too; finish() just makes the EOF explicit
and awaitable.
Avoid the full-duplex deadlock. A child's stdout pipe has a finite OS
buffer; once it fills, the child blocks writing stdout until something reads
it. If you push a large interactive stdin while nothing drains the child's
stdout, the child stops reading stdin (blocked on stdout), your write parks
waiting for stdin buffer space, and neither side progresses. The bc example
above is safe because it interleaves one write with one read; when you both feed
a sizable stdin and the child produces output, drain stdout_lines from one
task while writing stdin from another. (The non-interactive Stdin::from_*
sources are safe — the crate writes them on a background task that runs
concurrently with the output pumps.)
For one-directional streamed input (a channel, a file tail) you don't need
interactivity — give the command Stdin::from_lines(stream) /
Stdin::from_reader(reader) and let the background writer feed it; see the
stdin source table.
Readiness probes
"Start a server, then use it" needs ready, not merely started. Three probes replace the arbitrary sleep, each bounded by its own deadline:
#![allow(unused)] fn main() { use processkit::Command; use std::time::Duration; let mut run = Command::new("my-server").start().await?; // 1. A line on stdout (returns the matching line): let banner = run .wait_for_line(|l| l.contains("listening on"), Duration::from_secs(10)) .await?; // 2. A TCP port accepting connections: run.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10)) .await?; // 3. Any async predicate (an HTTP /health endpoint, a file appearing, …): run.wait_for(|| async { health_check().await }, Duration::from_secs(10)) .await?; // ready — use the server… }
Probe semantics, deliberately uniform:
- A probe that can't pass within its deadline fails with
Error::NotReady— distinct fromError::Timeout, which is the run's own deadline. - A probe also fails fast once readiness can no longer happen: the child
exits, or (for
wait_for_line) its stdout closes — no waiting out a 30s deadline on a dead server. - A failed probe never kills the child. You decide: retry, log and continue, or tear down.
wait_for_lineconsumes stdout up to (and including) the match — continue withfinishor further streaming.wait_for_port/wait_fordon't touch the pipes at all.
Racing children with wait_any
The free function wait_any races several running processes and reports
whichever exits first — the natural primitive for "restart whatever died" or
"first answer wins":
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup, wait_any}; let group = ProcessGroup::new()?; let mut a = group.start(&Command::new("replica-a")).await?; let mut b = group.start(&Command::new("replica-b")).await?; let (index, outcome) = wait_any(&mut [&mut a, &mut b]).await?; println!("contender #{index} exited first with {outcome:?}"); // Only borrows: the loser is still usable. let survivor = if index == 0 { &mut b } else { &mut a }; }
wait_any takes &mut borrows, applies no timeout of its own (wrap it in
tokio::time::timeout to bound the race), and does no output pumping — drain
chatty children first or give them bounded
buffer policies.
Per-run telemetry
With the opt-in stats feature, a running child reports its own
resource usage, and profile() turns a whole run into a summary:
#![allow(unused)] fn main() { use processkit::Command; use std::time::Duration; let run = Command::new("crunch").start().await?; run.cpu_time(); // Option<Duration> — user+kernel so far run.peak_memory_bytes(); // Option<u64> // …or capture + sample on an interval until exit: let profile = Command::new("crunch") .start().await? .profile(Duration::from_millis(100)) .await?; println!( "exit={:?} wall={:?} cpu={:?} peak_rss={:?} avg_cpu={:?} ({} samples)", profile.exit_code, profile.duration, profile.cpu_time, profile.peak_memory_bytes, profile.avg_cpu(), // cpu / wall — e.g. Some(1.7) ≈ 1.7 cores busy profile.samples, ); }
These read the child process itself (not a whole tree — that's
ProcessGroup::stats), and
availability follows the platform: full CPU/memory on Windows and Linux,
None where the kernel doesn't account per-process cheaply — see
Platform support.
Next: Pipelines · Timeouts, retries & cancellation · Supervision
Pipelines
a | b | c without a shell. Each stage's stdout feeds the next stage's
stdin through an in-process relay (a tokio::io::copy task per boundary) — there
is no shell string anywhere, so no quoting rules, no word splitting, no injection
surface. All stages spawn into one shared kill-on-drop
process group, so the chain lives and dies as a unit. (The
relay is an implementation detail, not a kernel splice: a producer whose consumer
exits early stops on a broken pipe when the relay's next write fails, rather
than instantly via SIGPIPE.)
- Building and running
- Semantics: pipefail and the ends
- Unchecked stages
- Timeouts
- Re-running a pipeline
Building and running
Command::pipe(next) starts a Pipeline; chain more stages with
Pipeline::pipe; drive it with output_string() or run():
use processkit::Command; #[tokio::main] async fn main() -> processkit::Result<()> { // git log --format=%an | sort | uniq -c let authors = Command::new("git").args(["log", "--format=%an"]) .pipe(Command::new("sort")) .pipe(Command::new("uniq").arg("-c")) .run() // require every stage to succeed .await?; println!("{authors}"); Ok(()) }
The verbs mirror Command's, each operating on the pipefail outcome:
| Verb | Returns | A failing stage is… |
|---|---|---|
output_string() | ProcessResult<String> | …reported in the result (code/stderr/program of the first unclean stage) |
output_bytes() | ProcessResult<Vec<u8>> | …same, with the last stage's stdout captured raw (binary pipes) |
run() | trimmed final stdout | …raised as that stage's Error::Exit; fails loud on a truncated capture |
checked() | full ProcessResult<String> | …raised as Error::Exit (untrimmed stdout) |
run_unit() | () | …raised as Error::Exit (output discarded) |
exit_code() | i32 | …its attributed code (no code → Error::Timeout/Signalled) |
probe() | bool | 0 → true, 1 → false, else Err |
parse(|s| …) / try_parse(|s| …) | T | …raised as Error::Exit; fails loud on a truncated capture |
Err from output_string itself means a stage couldn't be started or
driven at all (spawn failure, broken plumbing) — never a mere non-zero exit.
The streaming first_line probe is deliberately not a pipeline verb: a chain
consumes its last stage in full to fold the pipefail outcome. To capture the
first matching line of a finished chain, add a | head -n1 (Unix) / grep -m1
/ findstr stage and capture. This does not cover a streaming readiness
probe over a chain that must keep running (e.g. wait for a banner line, then
leave the chain alive) — | head would tear it down; use a single Command
with first_line for that.
The | operator is sugar for the same thing — a | b | c ≡
a.pipe(b).pipe(c). Parenthesize the chain before a terminal verb, since
method calls bind tighter than |:
#![allow(unused)] fn main() { use processkit::Command; let authors = (Command::new("git").args(["log", "--format=%an"]) | Command::new("sort") | Command::new("uniq").arg("-c")) .run() .await?; }
Semantics: pipefail and the ends
The outcome is pipefail, like set -o pipefail in a shell:
stdoutis always the last stage's output — that's what the chain produced.code,stderr, and the reported program come from the first stage that didn't exit cleanly (non-zero, signal-killed, or timed out) — or from the last stage when every stage succeeded.
#![allow(unused)] fn main() { use processkit::Command; let result = Command::new("cat").arg("data.txt") .pipe(Command::new("grep").arg("ERROR")) // suppose grep exits 2 (bad pattern) .pipe(Command::new("wc").arg("-l")) .output_string() .await?; // Diagnostics point at grep — the first unclean stage — while stdout is // whatever wc managed to print: assert_eq!(result.code(), Some(2)); println!("blamed: {}", result.ensure_success().unwrap_err()); // names `grep` }
The ends of the chain behave like a single Command:
- The first stage's configured
stdinsource is honored — feed the whole pipeline from a string, file, or stream. - Inner stages read from the pipe, full stop: any
stdinsource orkeep_stdin_openconfigured on them is overridden. - Inner stages' stderr is captured per-stage for pipefail diagnostics; only the last stage's stdout reaches you.
#![allow(unused)] fn main() { use processkit::{Command, Stdin}; let unique_count = Command::new("sort") .stdin(Stdin::from_iter_lines(["b", "a", "b", "c"])) .pipe(Command::new("uniq")) .pipe(Command::new("wc").arg("-l")) .run() .await?; assert_eq!(unique_count.trim(), "3"); }
Unchecked stages
Strict pipefail has one classic false positive: a consumer that legitimately
stops reading early. In producer | head -1 the consumer exits 0 after one
line and closes the pipe; the producer then stops on a broken pipe — its
next write fails once the relay's downstream is gone (a broken-pipe write error,
or SIGPIPE where the OS delivers it) — a perfectly normal death that strict
pipefail would blame the chain for. Mark that stage unchecked_in_pipe():
#![allow(unused)] fn main() { use processkit::Command; // seq 1 1000000 | head -1 — the producer's broken-pipe death is expected. let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe() | Command::new("head").args(["-n", "1"])) .run() .await?; assert_eq!(first.trim(), "1"); }
The rules:
- An unchecked stage's unclean exit — a non-zero code, a broken-pipe write
failure (or
SIGPIPEwhere the OS delivers it) from a consumer that closed early, or its own per-stage timeout kill — is skipped when the chain decides what to report. - A checked failure always trumps an unchecked one, regardless of
position:
uncheckednever shields another stage's real failure. - A chain whose only failures are unchecked reports success (the last
stage's stdout,
code 0). uncheckedforgives exit status only — never a whole-chainPipeline::timeout, and it has no effect on aCommandrun outside a pipeline (a single run's status is already plain data in itsProcessResult).
Timeouts
Two scopes, deliberately distinct:
#![allow(unused)] fn main() { use processkit::Command; use std::time::Duration; let out = Command::new("producer") .timeout(Duration::from_secs(10)) // per-STAGE: kills just `producer` .pipe(Command::new("consumer")) .timeout(Duration::from_secs(30)) // whole-CHAIN: Pipeline::timeout .output_string() .await?; }
Pipeline::timeoutbounds the whole chain: at the deadline the shared group is torn down and the result reportstimed_out(no partial stdout — unlike a single command's captured timeout).- A per-stage
Command::timeoutkills just that stage. Every stage is evaluated by the same pipefail rule: a stage that hit its own deadline — inner or last — surfaces onrun()as that stage'sError::Timeout, reporting that stage's own deadline (not the chain's, and never0ns).
Cancellation has two forms. Pipeline::cancel_on(token) is the chain-level
control: the token is applied to every stage, so firing it tears the whole chain
down and the run resolves to Error::Cancelled. (A cancel_on token on an
individual stage Command also cancels that stage and errors the pipeline, but
the pipeline-level builder is the clearer authority.) See
Timeouts & cancellation.
Re-running a pipeline
A Pipeline is Clone and re-runnable — stages are re-cloned per run. The
one caveat is inherited from Command: a one-shot stdin source on the
first stage (Stdin::from_reader / from_lines) is consumed by the first run;
re-running then fails loud (an Error::Io at launch) rather than
silently feeding empty stdin. Use the reusable sources
(from_string / from_bytes / from_iter_lines / from_file) when a chain
runs more than once.
Next: Timeouts, retries & cancellation · Running commands · Process groups
Timeouts, retries & cancellation
Three ways a run ends early, with three different philosophies:
-
a timeout is data — the deadline was part of the run's contract, so its expiry is captured in the result (and only the success-checking verbs turn it into an error);
-
a retry is a policy — the success-checking verbs replay the run while your classifier says the failure is transient;
-
a cancellation is an abandonment — the caller changed its mind, so every path reports an error; there is no result worth inspecting.
Timeouts
Command::timeout(d) kills the whole process tree at the deadline — not
just the direct child, so a wrapper script's grandchildren die too.
#![allow(unused)] fn main() { use processkit::Command; use std::time::Duration; // Captured: inspect the flag yourself. let result = Command::new("slow-tool") .timeout(Duration::from_secs(5)) .output_string() .await?; if result.timed_out() { println!("partial output before the kill: {}", result.stdout()); } // Raised: the checking verbs convert the flag into a typed error. let err = Command::new("slow-tool") .timeout(Duration::from_secs(5)) .run() .await .unwrap_err(); assert!(matches!(err, processkit::Error::Timeout { .. })); }
Where each verb lands:
| Verb | Deadline expiry becomes |
|---|---|
output_string() / output_bytes() | Ok result with timed_out() == true, code() == None, partial output kept |
run() / exit_code() / probe() / checked() | Error::Timeout { program, timeout, stdout, stderr } — the partial output captured before the kill is attached (err.diagnostic() surfaces a hung tool's last words) |
first_line(pred) | Error::Timeout (the line never arrived in time) |
start() + streaming | the stream ends at the deadline (tree killed, pipes closed); finish then reports the kill (outcome == Outcome::TimedOut) |
ensure_success() on a captured result | Error::Timeout, checked before the exit code |
Pipeline | chain deadline → timed_out result; per-stage deadlines fold into pipefail |
Two distinct deadline families to keep apart:
Command::timeout— the run's own contract, this section.- The readiness probes'
withinparameter — givesError::NotReadyand never kills the child.
Graceful timeout
By default the deadline hard-kills at once. Add timeout_grace(d) to give the
tree a chance to clean up: at the deadline it is sent SIGTERM (or the signal chosen
with timeout_signal, which needs the process-control feature), allowed up to the
grace window to exit, then SIGKILLed — the same SIGTERM → wait → SIGKILL tier as
ProcessGroup::shutdown. A signal-handling child that exits ends
the grace early.
#![allow(unused)] fn main() { use processkit::Command; use std::time::Duration; let result = Command::new("slow-tool") .timeout(Duration::from_secs(30)) .timeout_grace(Duration::from_secs(5)) // SIGTERM, wait up to 5s, then SIGKILL .output_string() .await?; }
timed_out() is true regardless of whether the child exited on the signal or was
SIGKILLed after the grace — the deadline is what fired. Windows has no signal
tier: timeout_grace is accepted but the deadline kills the job atomically.
The explicit RunningProcess::shutdown(grace) verb (stop a started handle on demand)
composes with a Command::timeout: its own SIGTERM → grace → SIGKILL is the
single teardown (it does not also fire the run's timeout teardown), and if the
deadline has already elapsed when you call shutdown, the outcome is reported as
Outcome::TimedOut — the grace you pass governs the teardown timing.
Retries
retry(max_attempts, backoff, classifier) replays a failed run — up to
max_attempts total attempts, sleeping backoff between tries, retrying
only while the classifier accepts the error:
#![allow(unused)] fn main() { use processkit::{Command, Error}; use std::time::Duration; let out = Command::new("curl") .args(["-fsS", "https://example.com/api"]) .timeout(Duration::from_secs(10)) .retry(3, Duration::from_millis(250), |e| { // transient: network timeouts and curl's "couldn't connect" (7) matches!(e, Error::Timeout { .. }) || matches!(e, Error::Exit { code: 7, .. }) }) .run() .await?; }
Ground rules:
- Retries apply to the success-checking paths only (
run,exit_code,probe,ProcessRunnerExt::checked— and everything built on them, e.g.CliClient). The non-erroringoutput_stringcapture never retries: it didn't fail. - The classifier sees the typed error — match on variants, codes, even the captured stderr.
- Each attempt re-runs the same
Command: a one-shot stdin source (table) is consumed by attempt #1, so attempt #2 fails loud with anError::Io(InvalidInput) at launch rather than silently feeding empty stdin. Use reusable sources for retried commands. - A
Cancellederror is never retried, classifier or not — the token stays cancelled forever, so another attempt could only fail the same way.
For "keep it alive" (restart a service whenever it exits) rather than
"replay this one operation", use a Supervisor — same
backoff shape, different loop condition.
Cancellation
Hand any command a CancellationToken (re-exported at the crate root);
cancelling the token kills the run's tree and makes every consuming path
report Error::Cancelled:
use processkit::{CancellationToken, Command}; #[tokio::main] async fn main() -> processkit::Result<()> { let shutdown = CancellationToken::new(); // Wire the same parent token into many jobs via child tokens: let job = tokio::spawn({ let token = shutdown.child_token(); async move { Command::new("long-export").cancel_on(token).run().await } }); // Ctrl-C handler, sibling failure, UI button, … shutdown.cancel(); assert!(matches!( job.await.unwrap(), Err(processkit::Error::Cancelled { .. }) )); Ok(()) }
The contract, path by path:
| Situation | Behavior |
|---|---|
Cancel during run / output_string / output_bytes / wait / profile / exit_code / probe | tree killed, Error::Cancelled { program } |
Cancel during streaming (stdout_lines) | the stream ends; the following finish reports Error::Cancelled |
| Token already cancelled before the run | short-circuits before spawning — no process is ever created |
Cancel on a shared-ProcessGroup handle | kills the child itself, leaves the group's siblings alone (same scope as a timeout) |
A Pipeline stage's token cancels | that stage dies; the cancellation errors the whole pipeline and the private group reaps the other stages |
Under retry | terminal — never retried, whatever the classifier says |
Under a Supervisor | terminal — supervision returns Err(Cancelled) instead of restarting into a still-cancelled token |
wait_any mid-run | the raw primitive doesn't synthesize the error — the race just resolves (a pre-cancelled token still hits the pre-spawn short-circuit) |
first_line mid-run | surfaces Error::Cancelled once the token fires — a cancelled stream that closes without a match is reported as cancellation, not Ok(None) |
Client-level default
A typed wrapper built on CliClient usually constructs
and consumes its Commands internally — there is no place to chain a
per-call cancel_on. Set the token once on the client; every command it
builds carries it:
#![allow(unused)] fn main() { use processkit::{CancellationToken, CliClient}; let token = CancellationToken::new(); let gh = CliClient::new("gh").default_cancel_on(token.child_token()); // ... controller cancels `token` → every in-flight command of THIS client // dies (whole tree), surfacing Error::Cancelled to the awaiting call. }
Clients are cheap — scope cancellation by building one client per
cancellable scope with its own (child) token, instead of threading tokens
through call signatures. cli_client!-generated wrappers re-emit the builder,
so Git::new().default_cancel_on(t) works for downstream crates too.
Precedence: a per-command cancel_on chained on a built command
replaces the client default (explicit beats default, like a per-command
timeout after default_timeout). To honor both sources, wire it
explicitly — CancellationToken has no built-in merge: derive a child of the
default (let c = default.child_token()), hand the command
cancel_on(c.clone()), and have the second source call c.cancel(). Or
simpler: build a dedicated client per scope.
Precedence and interactions
Timeout vs. cancellation. A timeout is captured; a cancellation is always an error. When both land on the same run, cancellation wins — you asked the run to stop mattering, so no result is synthesized:
#![allow(unused)] fn main() { use processkit::{CancellationToken, Command}; use std::time::Duration; let token = CancellationToken::new(); token.cancel(); let err = Command::new("tool") .timeout(Duration::from_millis(1)) // would have been a Timeout… .cancel_on(token) // …but cancellation takes priority .run() .await .unwrap_err(); assert!(matches!(err, processkit::Error::Cancelled { .. })); }
Which knob for which job:
| You want | Reach for |
|---|---|
| "This run may not take longer than X" | Command::timeout |
| "This operation is flaky, try a few times" | Command::retry |
| "Stop everything when the app shuts down" | cancel_on + one shared token |
| "Keep this service alive across crashes" | Supervisor |
| "Tell me when it's ready, don't kill it" | readiness probes |
Next: Supervision · Streaming & interactive I/O · Running commands
Supervision
Where retry answers "run this once,
replaying on failure", a Supervisor answers the different question "keep
this alive": restart a child per policy whenever it exits, with bounded
restarts, exponential backoff, and jitter — a minimal runit/systemd-style
keeper, platform-agnostic because it sits entirely on the
ProcessRunner seam.
- The shape
- Policies: what counts as a crash
- Backoff and jitter
- Failure storms
- Stopping
- Outcomes
- Supervising inside a shared group
- Errors and cancellation
The shape
use processkit::{Command, RestartPolicy, Supervisor}; use std::time::Duration; #[tokio::main] async fn main() -> processkit::Result<()> { let outcome = Supervisor::new(Command::new("my-server").args(["--port", "8080"])) .restart(RestartPolicy::OnCrash) // default .max_restarts(5) // default: unlimited .backoff(Duration::from_millis(200), 2.0) // default: 200ms × 2.0 .max_backoff(Duration::from_secs(30)) // default: 30s cap .jitter(true) // default: on .stop_when(|res| res.code() == Some(0)) // optional exit condition .run() .await?; println!( "ended after {} restarts, reason: {:?}, last exit: {:?}", outcome.restarts, outcome.stopped, outcome.final_result.code(), ); Ok(()) }
Each incarnation is one full captured run of the command (so the command's
own timeout, stdin, env, … all apply per run — with the usual
one-shot-stdin caveat for the second run
onward).
Policies: what counts as a crash
A crash is any run that is not a success (ProcessResult::is_success,
which honors the command's ok_codes): an exit code outside the accepted set
(default {0}), a timeout, a signal-kill, or a spawn failure. A command with
ok_codes([0, 2]) that exits 2 is a success, so OnCrash treats it as clean,
not a crash.
| RestartPolicy | Restarts after… |
|---|---|
OnCrash (default) | crashes only; a clean exit ends supervision (PolicySatisfied) |
Always | every completed run, clean or not — pair it with stop_when/max_restarts or it loops forever |
Never | nothing: one run, reported as-is |
Backoff and jitter
The n-th restart (0-based) sleeps
delay(n) = min(base × factor^n, max_backoff) × jitter
with jitter drawn uniformly from [0.5, 1.5) per restart. Jitter is on by
default so a fleet of supervised workers restarted by the same incident
doesn't stampede back in lockstep; jitter(false) gives deterministic delays
(useful in tests with a paused tokio clock). A non-finite or < 1.0 factor is
treated as 1.0 — constant delay, never a shrinking one.
base=200ms, factor=2.0, cap=30s:
restart #0 → ~200ms #1 → ~400ms #2 → ~800ms … #7 → ~25.6s #8+ → 30s (cap)
Failure storms
Backoff spaces individual restarts; max_restarts is a lifetime cap.
Neither distinguishes a service that fails once a day from one that is
suddenly crash-looping. The opt-in storm guard does (a design borrowed
from Go's suture supervisor — the
idea, not the code):
#![allow(unused)] fn main() { use processkit::{Command, Supervisor}; use std::time::Duration; let outcome = Supervisor::new(Command::new("worker")) .storm_pause(Duration::from_secs(15)) // master switch — off by default .failure_decay(Duration::from_secs(30)) // score half-life (default 30s) .failure_threshold(5.0) // trip point (default 5.0) .run() .await?; println!("storm pauses taken: {}", outcome.storm_pauses); }
Each failed run adds 1 to a score that halves every failure_decay:
score = score × 0.5^(Δt / failure_decay) + 1
- Fails rarely: the score decays back toward
1between failures and never reaches the threshold — the guard stays out of the way. - Failure storm: failures arrive faster than the half-life drains them, the
score climbs past
failure_threshold, and the supervisor takes one collective pause ofstorm_pause(jittered into[0.5, 1.5)like the backoff), resets the score, and resumes.
Only failures feed the score — crashes and spawn errors — not clean exits
restarted under RestartPolicy::Always. The pause stacks with (runs before)
the per-restart backoff, and the max_restarts budget is checked first, so a
storm pause never extends an exhausted budget. Pauses taken are reported in
SupervisionOutcome::storm_pauses.
Stopping
Three gates, checked in this order after every completed run:
stop_when(predicate)— sees the run'sProcessResult; returningtrueends supervision regardless of policy (→StopReason::Predicate). "Exit 0 is done, anything else is a crash" is the classic:stop_when(|res| res.code() == Some(0))underRestartPolicy::Always.- The policy —
OnCrashstops on a clean exit (→PolicySatisfied). max_restarts(n)— at most n restarts = n + 1 total runs; an exhausted budget reports the last result (→RestartsExhausted).max_restarts(0)means exactly one run.
Outcomes
run() resolves to a SupervisionOutcome:
#![allow(unused)] fn main() { let outcome = Supervisor::new(Command::new("job")).run().await?; outcome.final_result; // ProcessResult<String> of the LAST run outcome.restarts; // how many restarts happened (not counting run #1) outcome.stopped; // StopReason::{Predicate, PolicySatisfied, RestartsExhausted} outcome.storm_pauses; // failure-storm pauses taken (0 unless storm_pause is set) }
Note run() returning Ok does not mean the child succeeded — it means
supervision concluded. Inspect final_result (or ensure_success() it) for
the child's own verdict.
Supervising inside a shared group
The supervisor runs through any ProcessRunner. The headline production
variant injects a ProcessGroup so every incarnation —
and everything it spawns — lives in one kill-on-drop container:
#![allow(unused)] fn main() { use processkit::{Command, ProcessGroup, RestartPolicy, Supervisor}; let group = ProcessGroup::new()?; let outcome = Supervisor::new(Command::new("worker")) .with_runner(&group) // &group is itself a ProcessRunner .restart(RestartPolicy::OnCrash) .max_restarts(10) .run() .await?; // The group outlives supervision: drop it (or shutdown) to reap any strays. }
Mind one interaction: don't supervise into a group you've suspended — under the cgroup mechanism the restarted child would start frozen (and the spawn itself can block). Resume first.
The same injection point makes supervision logic hermetically testable — script a sequence of fake results and assert the restart/stop behavior with no real process; see Testing your code.
Errors and cancellation
A run that produces no result at all (spawn/IO failure) can't be judged by
stop_when; the policy treats it as a crash and restarts (with backoff)
unless the policy is Never or the budget is exhausted — then the error
itself surfaces as run()'s Err.
A cancelled incarnation is
terminal: run() returns
Err(Error::Cancelled) immediately. The token never un-cancels, so a restart
could only produce another instantly-cancelled run — the supervisor refuses
the futile loop.
Next: Testing your code · Timeouts, retries & cancellation · Process groups
Testing your code
Code that shells out is miserable to test — unless the subprocess is behind a
seam. In processkit that seam is one small trait. Only output_string is required;
output_bytes (raw-byte stdout) and start (a live handle for streaming/probes)
are defaulted, so a minimal double implements just output_string:
#[async_trait]
pub trait ProcessRunner: Send + Sync {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>>;
// Defaulted (route through `start`); override for byte/streaming support:
async fn output_bytes(&self, command: &Command) -> Result<ProcessResult<Vec<u8>>>;
async fn start(&self, command: &Command) -> Result<RunningProcess>;
}
Production code takes a runner (generically or as &dyn ProcessRunner); tests
hand it a double. Four doubles ship with the crate, plus a macro that makes
whole CLI wrappers testable for free.
- The
ProcessRunnerseam - Scripting replies:
ScriptedRunner - Asserting invocations:
RecordingRunner - Expectation-style:
MockRunner - Record/replay cassettes:
RecordReplayRunner - Wrapping a CLI tool:
CliClient
The ProcessRunner seam
JobRunner is the real implementation (each run in a fresh private group); a
ProcessGroup is also a runner (runs land in that
shared group); and impl ProcessRunner for &R means a borrowed runner
works wherever an owned one does — inject &group or &recording without
giving ownership away.
Every runner — real or double — gets the convenience helpers of
ProcessRunnerExt for free: run (trimmed stdout, success required),
run_unit, exit_code, probe (exit code as a boolean), checked
(success-checked full result), and parse/try_parse (feed stdout to a
closure; like first_line, generic over the closure so unavailable on a
&dyn ProcessRunner). Retry
policies work through the seam too, so
a double exercises your retry handling hermetically.
The seam covers streaming as well as bulk runs: ProcessRunner::start
returns a live RunningProcess, and a ScriptedRunner's start hands back a
scripted handle whose canned lines flow through the same pump machinery a real
child uses — stdout_lines, wait_for_line, and finish behave
identically, with no subprocess (see
Scripted streaming below). An output_string-only custom
runner keeps compiling: start is defaulted to Error::Unsupported.
#![allow(unused)] fn main() { use processkit::{Command, ProcessRunner, ProcessRunnerExt, Result}; // Production code: generic over the runner. async fn current_branch(runner: &impl ProcessRunner) -> Result<String> { runner .run(&Command::new("git").args(["branch", "--show-current"])) .await } }
Scripting replies
ScriptedRunner returns canned Replys for matched commands — the
work-horse double:
#![allow(unused)] fn main() { use processkit::{Command, ProcessRunnerExt}; use processkit::testing::{Reply, ScriptedRunner}; #[tokio::test] async fn detects_the_branch() { let runner = ScriptedRunner::new() // Match by program + argument PREFIX (element-wise; first element is // the program name, in registration order): .on(["git", "branch", "--show-current"], Reply::ok("main\n")) // …or by any predicate over the full Command: .when( |cmd| cmd.working_dir().is_some(), Reply::fail(128, "fatal: not a git repository"), ) // …with an optional catch-all: .fallback(Reply::ok("")); assert_eq!(current_branch(&runner).await.unwrap(), "main"); } }
The pieces:
Reply::ok(stdout)— exit 0.Reply::fail(code, stderr)— non-zero with stderr.Reply::lines(["a", "b"])— exit 0 with the lines joined (and streamed one by one on a scriptedstart).Reply::timeout()— a timed-out run (the checking helpers raiseError::Timeoutfrom it, carrying the command's own configured deadline). On a scriptedstartit resolves immediately as timed-out; to exercise a real deadline race, useReply::pending()+ aCommand::timeout..with_stdout(text)— attach stdout to any of them (e.g. theCONFLICT …text git prints on a failing merge)..with_line_delay(d)— pace a scripted stream's lines.Reply::pending()— parks the call until the command's cancellation token (per-commandcancel_onor the client-leveldefault_cancel_on) fires, then resolves withError::Cancelled— so a test can prove an orchestration actually cancels a blocked call, not just that it formats a canned error. With no token it parks forever, like a hung child.- Rules are tried in registration order; first match wins. Prefix
matching is element-wise over the program name then the arguments (the
first element is the program) —
on(["git", "foo"])matchesgit foo barbut notgit foobar(and notrm foo). Useon_sequenceto serve an ordered sequence of replies (each once, then the last repeats) for a fail-then-succeed scenario. - No match and no fallback is a loud error (
Error::Spawn, not-found) — an unexpected invocation can't slip through a test silently. - Bulk runs also replay the canned lines through the command's
on_stdout_line/on_stderr_linehandlers, so a wrapper's progress-reporting path is exercised without a subprocess.
Scripted streaming
ScriptedRunner::start returns a live RunningProcess backed by the canned
reply instead of an OS child. The canned stdout/stderr feed the same pump
machinery a real child uses, so the whole streaming surface works
hermetically — stdout_lines yields the lines, wait_for_line probes them,
finish reports the canned outcome and stderr:
#![allow(unused)] fn main() { use processkit::{Command, Outcome, ProcessRunner, StreamExt, Finished}; use processkit::testing::{Reply, ScriptedRunner}; use std::time::Duration; #[tokio::test] async fn server_becomes_ready() { let runner = ScriptedRunner::new() .on(["server", "serve"], Reply::lines(["booting", "listening on 8080"])); let mut run = runner.start(&Command::new("server").arg("serve")).await.unwrap(); run.wait_for_line(|l| l.contains("listening"), Duration::from_secs(5)) .await .unwrap(); // satisfied by the canned banner — no subprocess let Finished { outcome, .. } = run.finish().await.unwrap(); assert_eq!(outcome, Outcome::Exited(0)); } }
Reply::lines([...]) scripts the stdout lines; .with_line_delay(d) paces
them (deterministic under #[tokio::test(start_paused = true)]), and the
scripted run "exits" after the last line. The honest boundaries: a scripted
handle has no OS identity (pid() is None, profile reports empty
samples), does not compose into a real Pipeline, and does not model
interactive stdin. Reply::pending() scripts a run that never exits on its
own — cancel or time it out through the command's own knobs. A command
timeout does bound a scripted stream (it ends at the deadline and reports
Outcome::TimedOut, like a real child), but a scripted handle has no signal
tier, so — like on Windows — it ignores timeout_grace and ends at once.
Asserting invocations
RecordingRunner wraps another runner and records every Invocation — what
was asked — so a test asserts inputs, not just outputs:
#![allow(unused)] fn main() { use processkit::{Command, ProcessRunnerExt}; use processkit::testing::{RecordingRunner, Reply, ScriptedRunner}; #[tokio::test] async fn passes_the_right_flags() { let runner = RecordingRunner::new( ScriptedRunner::new().fallback(Reply::ok("done")), ); runner .run(&Command::new("gh").args(["pr", "create", "--draft"]).current_dir("/repo")) .await .unwrap(); let call = runner.only_call(); // panics unless exactly one call assert_eq!(call.args_str(), ["pr", "create", "--draft"]); assert!(call.has_flag("--draft")); assert_eq!(call.cwd.as_deref().map(|c| c.to_str().unwrap()), Some("/repo")); assert!(!call.has_stdin); } }
An Invocation captures the routing knobs — program, args, cwd,
envs (explicit overrides, None = removal), has_stdin — not the
I/O-shaping ones (timeout, encodings, buffer policy); assert those through a
when predicate over the Command itself. calls() returns the full list
when more than one run is expected.
Expectation-style: MockRunner
With the mock feature, mockall generates a MockRunner for
expectation-style tests (call counts, argument matchers, ordered
expectations) — the right tool when the interaction is the contract.
Note:
MockRunner'sexpect_*surface is generated bymockalland is exempt from this crate's semver guarantees — it tracks themockalldependency, not a frozen API. For a stable double, preferScriptedRunner(canned replies) orRecordingRunner(input assertions) above.
use processkit::testing::MockRunner;
let mut mock = MockRunner::new();
mock.expect_output_string()
.times(1)
.returning(|_cmd| /* build a Result<ProcessResult<String>> */ …);
MockRunnerdoes not inherit the defaults. Unlike a hand-written runner (whereoutput_bytes/startare defaulted),mockall::automockreplaces every method with an expectation — so a verb that routes throughstartoroutput_bytesneeds its ownexpect_start()/expect_output_bytes(), or the unset call panics ("no expectation").ScriptedRunnerprovides the defaults and the streaming seam out of the box.
For most tests ScriptedRunner/RecordingRunner read better; reach for the
mock when you need mockall's matching machinery.
Record/replay cassettes
With the record feature, RecordReplayRunner closes the loop: record
real runs to a JSON cassette once, then replay them deterministically —
fast, hermetic, byte-stable, no subprocess in CI:
#![allow(unused)] fn main() { use processkit::{Command, JobRunner, ProcessRunnerExt}; use processkit::testing::RecordReplayRunner; // Record once against the real tool (an opt-in `--record` test run, say): let runner = RecordReplayRunner::record("fixtures/git.json", JobRunner::new()); let version = runner.run(&Command::new("git").arg("--version")).await?; runner.save()?; // the error-surfacing flush // (best-effort on drop too) // Replay everywhere else: let runner = RecordReplayRunner::replay("fixtures/git.json")?; assert_eq!(runner.run(&Command::new("git").arg("--version")).await?, version); }
Semantics worth knowing before you commit a cassette:
| Aspect | Behavior |
|---|---|
| Match key | program + args + cwd + a stdin source digest (hashed, never persisted: in-memory bytes hash their content, a from_file source hashes its path) — no stdin (absent or Stdin::empty()) keys distinctly; lossy UTF-8 on the text parts |
| Environment | values never reach the file — only sorted variable names, so env secrets can't leak through a committed fixture; env is not matched, so env differences can't cause spurious misses |
| Duplicates of one key | replay in capture order, then the last entry repeats — a recorded sequence (git rev-parse HEAD before/after a commit) replays faithfully, while retry/probe loops keep getting a stable final answer |
| Miss | strict Error::CassetteMiss (distinct from a missing program — is_not_found() is false) — replay never spawns a surprise subprocess; a stale cassette fails loudly |
| Timeouts | a recorded timed-out run replays as one, surfacing Error::Timeout with the replaying command's deadline |
| Format | pretty-printed JSON with a version field; unknown versions / corrupt files / an entry with a contradictory outcome / a file over 64 MiB are Error::Io(InvalidData), a missing file keeps NotFound |
| Err results | not recorded — only completed runs (non-zero exits and captured timeouts are results and are recorded) |
Only env values are redacted. program, args, cwd, stdout, and
stderr are stored verbatim and can carry secrets (a --password=… flag, a
token echoed to output), so review a fixture before committing it. On Unix the
file is written 0600 and the write refuses to follow a symlink at the
cassette path (O_NOFOLLOW, so a planted link can't redirect the secret-bearing
write — it fails loud instead). On Windows the file inherits the containing
directory's ACL, so restrict that directory (or use a per-user temp dir, not a
world-writable shared one) for secret-bearing fixtures.
A neat trick: in tests, record against a ScriptedRunner instead of
JobRunner — the whole record→save→replay round trip is then itself
hermetic.
Wrapping a CLI tool
CliClient is the foundation for typed wrappers around external tools
(git, jj, gh, kubectl, …): it owns the program name, per-client
defaults, and the runner; your wrapper contributes only commands and parsers.
The cli_client! macro generates the boilerplate:
#![allow(unused)] fn main() { use processkit::{cli_client, Error, ProcessRunner, Result}; use std::path::Path; use std::time::Duration; cli_client!( /// A typed `git` client. pub struct Git => "git" ); impl<R: ProcessRunner> Git<R> { /// HEAD's commit id. pub async fn head(&self, repo: &Path) -> Result<String> { self.core.run(self.core.command_in(repo, ["rev-parse", "HEAD"])).await } /// Is the work tree clean? (exit code IS the answer) pub async fn is_clean(&self, repo: &Path) -> Result<bool> { self.core.probe(self.core.command_in(repo, ["diff", "--quiet"])).await } /// Branch list, parsed — the parser is fallible and returns the crate's /// `Result`, typically an `Error::Parse` naming the program. pub async fn branches(&self, repo: &Path) -> Result<Vec<String>> { self.core .try_parse( self.core.command_in(repo, ["branch", "--format=%(refname:short)"]), |out| { let list: Vec<String> = out.lines().map(str::to_owned).collect(); if list.is_empty() { Err(Error::Parse { program: "git".into(), message: "no branches".into(), }) } else { Ok(list) } }, ) .await } } // Production: the real runner, with per-client defaults. let git = Git::new().default_timeout(Duration::from_secs(30)); let head = git.head(Path::new(".")).await?; }
The generated type is Git<R: ProcessRunner = JobRunner> with Git::new(),
Git::with_runner(runner), default_timeout / default_env /
default_env_remove builders, and a public core: CliClient<R> whose helpers
speak the crate-wide verb vocabulary: run (trimmed stdout), output_string (full
result), run_unit (success only), exit_code, probe, plus parse
(infallible) and try_parse (fallible → Error::Parse).
And the payoff — the wrapper tests hermetically with any double:
#![allow(unused)] fn main() { #[tokio::test] async fn head_is_trimmed() { let git = Git::with_runner( ScriptedRunner::new().on(["git", "rev-parse", "HEAD"], Reply::ok("abc123\n")), ); assert_eq!(git.head(Path::new("/repo")).await.unwrap(), "abc123"); } }
…or with a cassette recorded against the real tool once.
Next: Platform support · Supervision · Running commands
Platform support
processkit supports Unix and Windows only — it requires tokio::process
and OS job / process-group primitives that have no equivalent on bare targets
like wasm. Building for such a target fails at compile time (a compile_error!
guard, or earlier in tokio's own dependencies). Within the supported set, it
treats platform support as first-class: every capability is either fully
implemented, honestly partial (documented and typed), or refused with
Error::Unsupported — never silently skipped. This page collects all the
matrices and fine print in one place.
Containment mechanisms
ProcessGroup::mechanism() reports which one you actually got:
| Mechanism | Platform | How containment works |
|---|---|---|
JobObject | Windows | A Job Object with kill-on-close; children are created suspended, assigned to the job, then resumed — so even a grandchild forked in the first instant is contained |
CgroupV2 | Linux (with delegation) | A private cgroup; children join in pre_exec, before exec, so descendants can never escape; teardown is cgroup.kill |
ProcessGroup | macOS, BSDs, Linux fallback | POSIX process groups (setpgid); teardown is killpg; tracked per started/adopted child |
On Linux the cgroup backend requires controller delegation, and resource
limits specifically need this process to run at the real cgroup-v2 root. The
crate creates the limit cgroup under this process's own cgroup and enables the
controllers in that cgroup's subtree_control, which cgroup v2's "no internal
processes" rule allows only for the real hierarchy root (the one exempt cgroup). A
cgroup namespace root does not qualify — it only virtualizes the view — so an
ordinary (private-cgroupns) container fails EBUSY just like a systemd
session/scope/service. The crate does not migrate your process into a sub-cgroup
to work around it, so in practice limits apply only at a minimal non-systemd init
sitting at the real root. Without a usable cgroup it quietly falls back to ProcessGroup —
unless you requested resource limits, which fail fast
instead (Error::ResourceLimit), because an unapplied cap is no protection.
Capability matrices
Teardown & containment
| Capability | Windows JobObject | Linux cgroup | Linux pgroup | macOS/BSD |
|---|---|---|---|---|
| Kill-on-drop, whole tree | ✅ | ✅ | ✅ groups-based | ✅ groups-based |
Graceful shutdown (TERM → grace → KILL) | 🟡 atomic kill only | ✅ | ✅ | ✅ |
adopt an external child | ✅ (future forks contained) | ✅ (future forks contained) | 🟡 exec'd child tracked individually | 🟡 same |
Signals & freezing
| Capability | Windows | Linux cgroup | Linux pgroup | macOS/BSD |
|---|---|---|---|---|
Arbitrary signal (Hup, Usr1, Other(n), …) | ❌ Kill only | ✅ | ✅ | ✅ |
suspend / resume | 🟡 per-thread counts | ✅ cgroup.freeze | ✅ SIGSTOP/CONT | ✅ SIGSTOP/CONT |
On the cgroup mechanism, a non-Kill signal (and the SIGSTOP/SIGCONT
fallback used for suspend/resume on pre-5.2 kernels without cgroup.freeze)
surfaces a real per-member delivery failure (e.g. EPERM) as an Err rather
than swallowing it — consistent with the "never silently skipped" philosophy; an
ESRCH race (the member already exited) is still success.
Inspection & accounting (stats feature)
| Capability | Windows | Linux cgroup | Linux pgroup | macOS/BSD |
|---|---|---|---|---|
members() | ✅ whole tree | ✅ whole tree | 🟡 leaders only | 🟡 leaders only |
| Group CPU / peak memory | ✅ | ✅ | ❌ count only | ❌ count only |
Per-run cpu_time / peak_memory_bytes / profile | ✅ | ✅ | ✅ (/proc) | ❌ None |
Resource limits (limits feature)
| Capability | Windows | Linux cgroup | Linux pgroup | macOS/BSD |
|---|---|---|---|---|
memory_max (whole tree) | ✅ | ✅ | ❌ | ❌ |
max_processes | ✅ | ✅ | ❌ | ❌ |
cpu_quota | 🟡 approximate | ✅ | ❌ | ❌ |
Spawn-time controls
| Capability | Windows | Unix (all) |
|---|---|---|
inherit_env allow-list | ✅ | ✅ |
uid / gid drop | ❌ Unsupported | ✅ |
setsid | ❌ Unsupported | ✅ |
create_no_window | ✅ | no-op |
kill_on_parent_death | ✅ always on (kernel) | Linux: direct child; macOS/BSD: no-op |
Everything not listed — capture, streaming, interactive stdin, encodings, buffer policies, timeouts, retry, pipelines, supervision, readiness probes, the test doubles, cassettes, cancellation — is platform-agnostic and behaves identically everywhere.
Caveats
The honest fine print, mostly consequences of OS semantics:
Windows: termination is an exit code, never Signalled (D18). Windows has
no signal abstraction, so a killed process reports
Outcome::Exited,
not Outcome::Signalled. TerminateProcess / TerminateJobObject(_, 1) is
Exited(1) — indistinguishable from a voluntary exit(1) — and Ctrl-C
surfaces as Exited(-1073741510) (STATUS_CONTROL_C_EXIT as a signed i32).
The crate reports the platform truth rather than fabricating a Signalled from
an NTSTATUS code (that mapping would be a lossy guess). When you need to know
the run was killed, use a ProcessGroup deadline or a cancellation token (which
surface as TimedOut / Error::Cancelled on every platform). Outcome::Signalled
is therefore Unix-only.
Linux cgroup delegation. Creating the per-group cgroup needs write access
to the cgroup v2 hierarchy. Dev boxes typically lack it → the pgroup fallback.
CI inside containers usually has it. Check mechanism() when behavior must
not silently degrade.
uid()/gid() × the cgroup mechanism. The OS applies the uid drop
before pre_exec hooks, and the cgroup join runs in pre_exec — as the
already-dropped user, who can't write the root-owned cgroup.procs. The spawn
fails with a permission error (never an uncontained child). Privilege drop
composes cleanly with the process-group mechanism.
setsid() × process groups. A new session implies a new process group;
the crate coordinates the two (the containment tracking follows the new
session's group), so setsid keeps the kill-on-drop guarantee instead of
breaking out of it.
kill_on_parent_death() is thread-scoped on Linux. PR_SET_PDEATHSIG
fires when the spawning thread dies, not only the process. On a
multi-threaded tokio runtime a retired worker thread could kill the child
early; spawn from a current-thread runtime for the strongest guarantee. It
covers the direct child only — with the parent SIGKILLed, nothing tears
the cgroup/pgroup down, so grandchildren survive. The
parent-died-before-arming race is closed by re-checking getppid() in the
child against the spawner's pid captured before the fork — which stays
correct when the spawner itself is PID 1 (a container entrypoint).
Windows: the suspended-spawn handshake. Children are created
CREATE_SUSPENDED, assigned to the job, then resumed — closing the classic
race where a fast child forks before it's in the job. A consequence: a raw
ProcessGroup::spawn caller passing its own creation flags gets them OR'd
with CREATE_SUSPENDED (the Command-driven paths handle this for you, incl.
create_no_window).
Windows: nested suspends. SuspendThread keeps per-thread counts — two
suspend() calls need two resume()s. The POSIX backends are level-triggered
(idempotent). Suspension is also best-effort against a tree that is spawning
threads mid-walk.
Spawning into a suspended cgroup group. The freeze is group state: a
child spawned or adopted while suspended joins frozen — the forked child
joins the cgroup before exec, so it can freeze before completing the
spawn handshake and start() may never return until resume. Resume
before starting new work; details in
Process groups.
Frozen trees and graceful shutdown. Hard kills penetrate a frozen tree
(SIGKILL / cgroup.kill / job terminate), but a graceful shutdown leads
with a SIGTERM the frozen processes can't handle — it waits out the full
grace. Resume first.
pgroup backends: leaders, zombies, pid reuse. members() lists tracked
group leaders only; an exited-but-unreaped child (zombie) still probes as
alive (keep wait()ing handles if you need prompt liveness, e.g. for
shutdown's early return); and pid-based signalling is inherently
best-effort against pid reuse — the crate prunes dead entries on every probe
to keep the window minimal.
Next: Process groups · Running commands · ‹ docs index
What's next
ProcessKit is a Rust library today, published as processkit on crates.io. The plan is to bring the same approach — kernel-backed whole-tree containment, honest error semantics, and testable seams — to other ecosystems: a Go package, an F# library, a Kotlin library, and a Python wrapper. Each implementation will follow the same philosophy and be documented here as it ships.