ProcessKit

crates.io docs.rs GitHub

ProcessKit is an async child-process management library for Rust (tokio). It solves the orphan-process problem at the kernel level and packages a full set of tools around it: streaming I/O, shell-free pipelines, supervision, and hermetic testing seams.

Running external programs is part of everyday software: compiling code, querying version control, invoking CLI tools, managing background servers. Every major runtime makes it easy to start a child process. What they don't make easy is cleaning up after one.

The orphan problem

When a build tool spawns compiler workers, when an integration test starts a local database, when a wrapper script calls the real binary — those grandchildren exist outside your program's awareness. If your code panics, times out, or drops a future mid-flight, the direct child may receive a signal. But everything deeper in the tree keeps running as orphans: ports stay bound, temp files stay open, CPU keeps spinning. The next test run tries to bind the same port and fails with "address already in use."

This is not an edge case. It is the default behavior of every process-spawning API that works at the level of a single pid — including std::process and tokio::process.

Whole-tree containment

ProcessKit solves this at the kernel level. Every process you start lives inside an operating-system containment object: a Job Object on Windows, a cgroup v2 on Linux (with a POSIX process-group fallback on hosts without cgroup delegation), or a POSIX process group on macOS and BSDs.

When the Rust value that owns the container is dropped — by normal flow, by an error propagating through ?, or by a panic — the kernel kills every member of the tree. Grandchildren included. This is not a best-effort signal sent to a list of pids. It is one atomic kernel operation. A child cannot escape the container by forking quickly; a signal cannot be missed because a descendant already changed its session.

The library reports the mechanism it actually got — mechanism() returns JobObject, CgroupV2, or ProcessGroup — so you can verify the guarantee rather than assume it.

Getting started

[dependencies]
processkit = "1"

A tokio runtime is required. Requires Rust 1.88 or later (MSRV). The crate is stable at 1.0; breaking changes land only in a new major version.

The simplest case — run a command, get its trimmed stdout, fail on error:

use processkit::Command;

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let branch = Command::new("git")
        .args(["branch", "--show-current"])
        .run()
        .await?;
    println!("on branch: {branch}");
    Ok(())
}

When you need more than "success or error" — the exit code, both streams, whether the run timed out — output_string() returns the full result without raising on a non-zero exit:

use processkit::Command;
#[tokio::main] async fn main() -> processkit::Result<()> {
let result = Command::new("cargo").arg("test").output_string().await?;
if result.timed_out() {
    eprintln!("tests hung; partial output:\n{}", result.stdout());
} else if !result.is_success() {
    eprintln!("exit {}: {}", result.code().unwrap(), result.stderr());
}
Ok(()) }

The key design choice: a non-zero exit is captured data until you explicitly ask for success. Timeouts are captured in the result. Only cancellation is always an error — because an abandoned run has no result worth inspecting.

Feature flags

Each flag is additive. The kill-on-drop guarantee is unconditional in every configuration.

FeatureDefaultAdds
process-controlSignals, suspend/resume, members(), adopt()
statsResource measurement: CPU time, peak memory, sample_stats, profile
limitsWhole-tree resource caps (implies stats)
recordRecord/replay cassettes (adds serde)
mockmockall-generated MockRunner (semver-exempt surface)
tracingLifecycle events via the tracing crate (never logs argv/env values)

How it compares

whole-tree kill-on-dropasynclimits / statsstreaming · pipelines · supervision
std::process
tokio::process
command-group
async-process✓ (smol)
ductpipelines only
processkit✓ (tokio)

The first column is the differentiator: descendants are contained and reaped as a unit, not just the direct child.

Consuming verbs

Every run begins with the same Command builder; the verb you end with determines what you receive:

What you wantVerbWhat you get
stdout, success requiredrun()trimmed String; non-zero / timeout / kill → typed error
full outcome, exit as dataoutput_string()ProcessResult — code, stdout, stderr, timed_out
just the exit codeexit_code()i32; a timed-out run errors instead of returning -1
a yes/no answerprobe()bool0 → true, 1 → false, anything else errors
a typed value from stdoutparse(|s| …)T, success required
typed value, non-zero oktry_parse(|s| …)Option<T>None on non-zero
first matching output linefirst_line(|l| …)Option<String>
a live handle for streamingstart()RunningProcess

The same vocabulary is available on every layer: ProcessRunner, ProcessGroup, CliClient.

Streaming and interactive I/O

For commands that produce large or incremental output, start() returns a live handle you drive yourself. Stream stdout line by line as the child produces it, with no buffering and no waiting for exit:

use processkit::{Command, StreamExt, Finished, Outcome};

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let mut run = Command::new("cargo")
        .args(["build", "--release"])
        .start()
        .await?;

    let mut lines = run.stdout_lines()?;
    while let Some(line) = lines.next().await {
        println!("{line}");
    }
    // Stderr was drained in the background the whole time.
    let Finished { outcome, stderr, .. } = run.finish().await?;
    if outcome != Outcome::Exited(0) {
        eprintln!("build failed:\n{stderr}");
    }
    Ok(())
}

For conversational tools — send a request, read the response, repeat — keep_stdin_open() gives you an async writer you can interleave with reads. The library handles the background stderr drain so the child can never block on a full pipe while you're busy with stdout.

Readiness probes solve "start a server, then use it" without guessing at an arbitrary sleep:

use processkit::Command;
use std::time::Duration;
#[tokio::main] async fn main() -> processkit::Result<()> {
let mut run = Command::new("my-server").start().await?;

// Wait for the startup banner on stdout:
run.wait_for_line(|l| l.contains("listening on"), Duration::from_secs(10))
    .await?;

// Or wait for a TCP port to accept connections:
run.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10))
    .await?;
Ok(()) }

A probe that cannot pass — the child exited, or the deadline elapsed — fails with Error::NotReady and does not kill the child. You decide what to do next.

Shell-free pipelines

a | b | c without a shell string. Stages are connected in-process through a relay, so there are no quoting rules, no word-splitting, and no injection surface. All stages share one kill-on-drop group.

use processkit::Command;

#[tokio::main] async fn main() -> processkit::Result<()> {
let authors = (Command::new("git").args(["log", "--format=%an"])
    | Command::new("sort")
    | Command::new("uniq").arg("-c"))
    .run()
    .await?;
println!("{authors}");
Ok(()) }

The outcome follows pipefail semantics: stdout is always the last stage's output, but the exit code, stderr, and program name are attributed to the first stage that failed. A stage that legitimately stops reading early — the classic producer | head -n1 shape — can be marked .unchecked_in_pipe() so its broken-pipe exit is not counted as a failure.

Timeouts, retries, and cancellation

Command::timeout(d) kills the whole process tree at the deadline. For the one-shot capture verbs the expiry is part of the result; for the success-checking verbs it becomes a typed Error::Timeout that carries the partial output captured before the kill — useful for diagnosing what a hung tool's last words were.

Command::retry(attempts, backoff, classifier) replays the run on transient failure. The classifier sees the typed error — you can match on the exit code, an Error::Timeout, or the captured stderr. A cancelled run is never retried: the token stays cancelled.

CancellationToken (re-exported from tokio-util) is the coordinated shutdown primitive. Wire the same parent token into many jobs via child tokens; cancelling the parent kills every process tree and every consuming path reports Error::Cancelled.

use processkit::{CancellationToken, Command};
use std::time::Duration;

#[tokio::main] async fn main() -> processkit::Result<()> {
let shutdown = CancellationToken::new();

let job = tokio::spawn({
    let token = shutdown.child_token();
    async move {
        Command::new("long-job")
            .timeout(Duration::from_secs(30))
            .cancel_on(token)
            .run()
            .await
    }
});

// Signal from anywhere — Ctrl-C, sibling failure, UI button:
shutdown.cancel();
Ok(()) }

Keeping a service alive

Where retry answers "replay this one operation until it succeeds," a Supervisor answers "keep this running." It restarts the command on exit per policy, with bounded restarts, exponential backoff, and per-default jitter so a restarted fleet doesn't pile back in lockstep:

use processkit::{Command, RestartPolicy, Supervisor};
use std::time::Duration;

#[tokio::main] async fn main() -> processkit::Result<()> {
let outcome = Supervisor::new(Command::new("my-server").args(["--port", "8080"]))
    .restart(RestartPolicy::OnCrash)
    .max_restarts(10)
    .backoff(Duration::from_millis(200), 2.0)
    .storm_pause(Duration::from_secs(15))  // crash-loop guard
    .run()
    .await?;

println!("ended after {} restarts: {:?}", outcome.restarts, outcome.stopped);
Ok(()) }

The optional storm guard distinguishes "fails occasionally" from "crash-looping": it maintains a half-life score that grows with each failure and decays between them. When the score exceeds a threshold, the supervisor takes one collective pause rather than hammering the restart timer at backoff speed.

Resource limits

With the limits feature, a ProcessGroup can cap the whole tree's memory, process count, and CPU at creation time — enforced by the same kernel object that provides kill-on-drop:

use processkit::{Command, ProcessGroup, ProcessGroupOptions};

#[tokio::main] async fn main() -> processkit::Result<()> {
let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .memory_max(512 * 1024 * 1024)  // 512 MiB across the whole tree
        .max_processes(64)
        .cpu_quota(0.5),                 // half of one core
)?;
let _job = group.start(&Command::new("untrusted-tool")).await?;
Ok(()) }

A limit that cannot be enforced — no cgroup delegation, no Job Object — is a hard error at group creation time, not a silently unapplied cap. An unapplied cap is no protection.

Testing code that shells out

Subprocess behavior is notoriously difficult to test. ProcessKit exposes a single trait — ProcessRunner — that decouples "what to run" from "how to run it." Production code takes a runner generically; tests inject a double.

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessRunner, ProcessRunnerExt, Result};

async fn current_branch(runner: &impl ProcessRunner) -> Result<String> {
    runner.run(&Command::new("git").args(["branch", "--show-current"])).await
}
}

The ScriptedRunner returns canned replies for matched commands. The RecordingRunner captures every invocation for assertion. With the record feature, RecordReplayRunner records real runs to a JSON cassette and replays them in CI — fast, hermetic, byte-stable, no subprocess. The seam covers streaming too: a scripted start() feeds canned lines through the same pump machinery the real child uses, so stdout_lines, wait_for_line, and finish all behave identically in tests.

The cli_client! macro generates typed wrappers around external tools (git, gh, kubectl, …) that are injectable for free:

#![allow(unused)]
fn main() {
use processkit::{cli_client, ProcessRunner, Result};
use std::path::Path;

cli_client!(pub struct Git => "git");

impl<R: ProcessRunner> Git<R> {
    pub async fn head(&self, dir: &Path) -> Result<String> {
        self.core.run(self.core.command_in(dir, ["rev-parse", "HEAD"])).await
    }
}

// In production: Git::new().head(Path::new(".")).await
// In tests:      Git::with_runner(ScriptedRunner::new().on([…], Reply::ok("abc\n")))
}

Guides

The Cookbook maps "I want to do X" directly to a working snippet — the fastest way in. The individual guides go deeper on each topic:

GuideCovers
CookbookTask-to-snippet recipes for every capability
Running commandsThe full Command builder, every verb, error semantics
Process groupsContainment, teardown, signals, suspend/resume, limits, stats
Streaming & interactive I/OLine streaming, interactive stdin, readiness probes, profiling
PipelinesShell-free chains, pipefail attribution, chain timeouts
Timeouts, retries & cancellationDeadlines, retry classifiers, CancellationToken
SupervisionRestart policies, backoff & jitter, storm guard, outcomes
Testing your codeProcessRunner seam, scripted/recording/cassette doubles, CliClient
Platform supportMechanisms, capability matrices, platform caveats
UpgradingPer-version migration notes

API reference: docs.rs/processkit.

What's next

ProcessKit is a Rust library today, published as processkit on crates.io. The plan is to bring the same approach — kernel-backed whole-tree containment, honest error semantics, and testable seams — to other ecosystems: a Go package, an F# library, a Kotlin library, and a Python wrapper. Each implementation will follow the same philosophy and be documented here as it ships.

A note on development. This project was built with significant assistance from AI tools throughout the design and implementation process. That said, every line of code was read, understood, and deliberately chosen — this is not generated output dropped into a repository unchecked. The author takes full responsibility for correctness, API design, and the published result.

Cookbook

‹ docs index

Task-oriented recipes: find the thing you're trying to do, copy the snippet, follow the link when you need the fine print. Every snippet assumes a tokio runtime and use processkit::Command; unless shown otherwise.

Run a command and get its output

#![allow(unused)]
fn main() {
let head = Command::new("git").args(["rev-parse", "HEAD"]).run().await?;
}

run() requires a zero exit and returns stdout with trailing whitespace trimmed; a non-zero exit, spawn failure, or timeout is a typed Error. For a one-liner without the builder: processkit::run("git", ["rev-parse", "HEAD"]).

Fine print: Running commands → consuming verbs.

Inspect a failure instead of erroring

#![allow(unused)]
fn main() {
let result = Command::new("git").args(["merge", "topic"]).output_string().await?;
if !result.is_success() {
    eprintln!("merge exited {:?}: {}", result.code(), result.stderr());
}
}

output_string() (and output_bytes() for raw bytes) treats the exit code as data — Err means the run couldn't happen at all. Call result.ensure_success()? later to convert a stored failure into the same typed error run() would have produced.

Fine print: Running commands → results and errors.

Ask a yes/no question

#![allow(unused)]
fn main() {
let dirty = !Command::new("git").args(["diff", "--quiet"]).probe().await?;
}

probe() maps exit 0 → true, exit 1 → false, and anything else to an error — the git diff --quiet / grep -q convention without manual code matching.

Accept non-zero exit codes as success

#![allow(unused)]
fn main() {
// `grep` exits 1 when it finds no match — not a failure for this call.
let found = Command::new("grep")
    .args(["needle", "haystack.txt"])
    .ok_codes([0, 1])
    .output_string()
    .await?;
let matched = found.code() == Some(0); // 0 = matched, 1 = no match (both "success")
}

ok_codes widens what the checking verbs (run/run_unit) and is_success/ensure_success treat as success — for tools whose non-zero exit is a normal result (grep 1 = no match, diff 1 = differs, rsync's code families). It does not change exit_code (always the raw code) or probe (always the 0/1 convention). An empty set is ignored, so the default stays exit 0.

Bound a run with a timeout

#![allow(unused)]
fn main() {
use std::time::Duration;

let result = Command::new("slow-tool")
    .timeout(Duration::from_secs(30))
    .output_string()
    .await?;
if result.timed_out() {
    eprintln!("gave up after 30s; partial output: {}", result.stdout());
}
}

At the deadline the whole tree is killed. On the capture verbs the timeout is captured (timed_out(), partial output kept); on the success-checking verbs (run, exit_code) it surfaces as Error::Timeout.

Let a tool clean up on timeout

#![allow(unused)]
fn main() {
use std::time::Duration;

let result = Command::new("dev-server")
    .timeout(Duration::from_secs(30))
    .timeout_grace(Duration::from_secs(5)) // SIGTERM, wait up to 5s, then SIGKILL
    .output_string()
    .await?;
}

timeout_grace turns the hard deadline kill into a graceful one: SIGTERM (or the signal from timeout_signal, with the process-control feature), up to the grace window to exit, then SIGKILL. A signal-handling child exits early; timed_out() stays true. Windows has no signal tier — the deadline kills atomically.

Fine print: Timeouts → graceful timeout.

Show a useful error message

#![allow(unused)]
fn main() {
if let Err(e) = Command::new("git").args(["merge", "topic"]).run().await {
    eprintln!("merge failed: {}", e.diagnostic().unwrap_or("(no output)"));
}
}

Error::diagnostic() picks the most explanatory captured text — stderr, falling back to stdout (git writes CONFLICT … there) — so callers don't re-implement the same heuristic.

Feed the child's stdin

#![allow(unused)]
fn main() {
use processkit::Stdin;

// A string you already have:
let sorted = Command::new("sort")
    .stdin(Stdin::from_string("banana\napple\n"))
    .run()
    .await?;

// …or any async source: a reader (file, socket) or a stream of lines.
let from_file = Stdin::from_reader(tokio::fs::File::open("input.txt").await?);
let from_chan = Stdin::from_lines(tokio_stream::iter(vec!["one".to_owned()]));
}

One-shot sources (from_reader/from_lines) feed a single run; re-running the same Command afterwards fails loud (an Error::Io at launch, D10) instead of silently seeing empty stdin. For a conversation, see the next recipe but one.

Fine print: Running commands → standard input.

Stream output as it arrives

#![allow(unused)]
fn main() {
use processkit::{StreamExt, Finished}; // StreamExt re-exported; provides `.next()`

let mut run = Command::new("cargo").args(["build", "--verbose"]).start().await?;
let mut lines = run.stdout_lines()?;
while let Some(line) = lines.next().await {
    println!("build: {line}");
}
let Finished { outcome, stderr, .. } = run.finish().await?; // outcome + buffered stderr
}

No waiting for exit, no full-output buffering; stderr is drained in the background so the child can't block. A timeout on the command bounds the stream itself. Prefer a callback? .on_stdout_line(|l| …) runs one per line while any capture verb drives the run.

Fine print: Streaming & interactive I/O.

Talk to an interactive child

#![allow(unused)]
fn main() {
use processkit::StreamExt;

let mut run = Command::new("bc").keep_stdin_open().start().await?;
let mut stdin = run.take_stdin().expect("stdin was kept open");
stdin.write_line("2 + 2").await?;
stdin.finish().await?; // EOF — bc exits

let mut answers = run.stdout_lines()?;
while let Some(answer) = answers.next().await {
    println!("{answer}");
}
}

keep_stdin_open() hands you an async writer instead of closing stdin at spawn; interleave writes with reads for request/response tools. Its writer methods return std::io::Result (idiomatic for a writer) — convert with .map_err(processkit::Error::Io)? in a processkit::Result function, or use Box<dyn std::error::Error>.

Fine print: Streaming & interactive I/O → interactive stdin.

Pipe commands without a shell

#![allow(unused)]
fn main() {
let authors = Command::new("git").args(["log", "--format=%an"])
    .pipe(Command::new("sort"))
    .pipe(Command::new("uniq").arg("-c"))
    .output_string()
    .await?;
}

Native pipes — no shell string, no quoting, no injection surface. The outcome is pipefail: stdout comes from the last stage, the reported failure from the first stage that didn't exit cleanly. All stages share one kill-on-drop group. The | operator is equivalent sugar: (a | b | c).output_string().

For a consumer that legitimately stops reading early — the | head -1 shape, where the producer's broken-pipe death (its next write fails once the downstream closes, or SIGPIPE where the OS delivers it) is expected — mark the producer unchecked_in_pipe() so that death doesn't fail the chain:

#![allow(unused)]
fn main() {
let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe()
    | Command::new("head").args(["-n", "1"]))
    .run()
    .await?;
}

Fine print: Pipelines → unchecked stages.

Start a server and wait until it's ready

#![allow(unused)]
fn main() {
use std::time::Duration;

let mut server = Command::new("my-server").args(["--port", "8080"]).start().await?;

// Pick the probe that matches how the server announces readiness:
server.wait_for_line(|l| l.contains("listening"), Duration::from_secs(10)).await?;
// server.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10)).await?;
// server.wait_for(|| async { http_health().await }, Duration::from_secs(10)).await?;

// …use the server; dropping `server` kills its whole tree.
}

A probe that can't succeed fails fast with Error::NotReady and never kills the child — you decide what happens next. No more sleep(2) and hoping.

Fine print: Streaming & interactive I/O → readiness probes.

Tear down several children as a unit

#![allow(unused)]
fn main() {
use processkit::ProcessGroup;

let group = ProcessGroup::new()?;
let _db = group.start(&Command::new("dev-db")).await?;
let _api = group.start(&Command::new("dev-api")).await?;

// Either: graceful — SIGTERM, bounded wait, optional SIGKILL escalation…
group.shutdown().await?;
// …or just drop(group): hard kill-on-drop of everything, grandchildren included.
}

The group is the unit of fate: a panic or early return anywhere reaps every member. Configure the grace window via ProcessGroupOptions.

Fine print: Process groups.

React to whichever child exits first

#![allow(unused)]
fn main() {
use processkit::{ProcessGroup, wait_any};

let group = ProcessGroup::new()?;
let mut a = group.start(&Command::new("worker-a")).await?;
let mut b = group.start(&Command::new("worker-b")).await?;

let (idx, outcome) = wait_any(&mut [&mut a, &mut b]).await?;
println!("worker #{idx} exited first with {outcome:?}");
// `a` and `b` are only borrowed — the loser is still usable here.
}

Fine print: Streaming & interactive I/O → racing children.

Sandbox an untrusted tool

#![allow(unused)]
fn main() {
use processkit::{ProcessGroup, ProcessGroupOptions};

// Cap the whole tree (requires the `limits` feature; Windows Job / Linux cgroup):
let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .memory_max(512 * 1024 * 1024)
        .max_processes(64)
        .cpu_quota(0.5),
)?;

let result = group
    .start(
        &Command::new("untrusted-tool")
            .inherit_env(["PATH"]) // allow-list: everything else is cleared
            .timeout(std::time::Duration::from_secs(60)),
    )
    .await?
    .output_string()
    .await?;
}

Unenforceable limits are a hard Error::ResourceLimit, never a silently unbounded group. On Unix, add .uid(…)/.gid(…) to drop privileges (note the cgroup-mechanism caveat in the guide).

Fine print: Process groups → resource limits · Running commands → privileges.

Keep a crash-prone service running

#![allow(unused)]
fn main() {
use processkit::{RestartPolicy, Supervisor};
use std::time::Duration;

let outcome = Supervisor::new(Command::new("my-service"))
    .restart(RestartPolicy::OnCrash)
    .max_restarts(5)
    .backoff(Duration::from_millis(200), 2.0)
    .storm_pause(Duration::from_secs(15)) // crash-loop guard (off by default)
    .run()
    .await?;
println!(
    "stopped after {} restarts ({} storm pauses): {:?}",
    outcome.restarts, outcome.storm_pauses, outcome.stopped
);
}

Exponential backoff with jitter by default; stop_when(…) ends supervision on a condition; .with_runner(&group) keeps every incarnation inside one shared kill-on-drop group. storm_pause arms the failure-storm guard: failures feed a decaying score, and past the threshold the supervisor takes one collective pause instead of hammering restarts — "fails rarely" and "crash-looping" stop being the same case.

Fine print: Supervision, failure storms.

Retry a flaky command

#![allow(unused)]
fn main() {
use processkit::Error;
use std::time::Duration;

let fetched = Command::new("git")
    .args(["fetch", "--quiet"])
    .timeout(Duration::from_secs(10))
    .retry(3, Duration::from_millis(200), |e| {
        matches!(e, Error::Timeout { .. })
            || e.diagnostic().is_some_and(|m| m.contains("Could not resolve host"))
    })
    .run()
    .await?;
}

The classifier sees the typed error and decides whether this failure is worth another attempt; each attempt is a fresh process. retry replays a run to success — for keeping a process alive, use a Supervisor (previous recipe).

Fine print: Timeouts, retries & cancellation → retry.

Cancel runs on shutdown

#![allow(unused)]
fn main() {
use processkit::CancellationToken;

let token = CancellationToken::new();

let job = tokio::spawn({
    let token = token.child_token();
    async move { Command::new("long-job").cancel_on(token).run().await }
});

// On Ctrl-C / shutdown signal / sibling failure:
token.cancel(); // kills the tree; the run resolves to Error::Cancelled
let outcome = job.await; // Err(Error::Cancelled { .. }) inside
}

Cancellation is always an error (the run was abandoned, there is no result), beats a simultaneous timeout, and is terminal for retry and Supervisor alike.

For a typed wrapper whose commands never cross your code, set the token once on the client — every command it builds carries it:

#![allow(unused)]
fn main() {
use processkit::{CancellationToken, CliClient};

let token = CancellationToken::new();
let gh = CliClient::new("gh").default_cancel_on(token.child_token());
// token.cancel() → every in-flight command of THIS client dies.
}

Fine print: Timeouts, retries & cancellation → cancellation, client-level default.

Measure what a run cost

#![allow(unused)]
fn main() {
use std::time::Duration;

// One run, summarized (requires the opt-in `stats` feature):
let profile = Command::new("crunch").start().await?.profile(Duration::from_millis(100)).await?;
println!("exit={:?} took={:?} peak_rss={:?} avg_cpu={:?}",
    profile.exit_code, profile.duration, profile.peak_memory_bytes, profile.avg_cpu());
}

For a live series over a whole group, group.sample_stats(every) yields a Stream of snapshots. CPU/memory need a real container (Windows Job / Linux cgroup); elsewhere you still get process counts.

Fine print: Process groups → stats · Streaming → profiling.

Contain a process you didn't spawn

#![allow(unused)]
fn main() {
use processkit::ProcessGroup;

let child = tokio::process::Command::new("legacy-launcher").spawn()?;

let group = ProcessGroup::new()?; // `adopt` is part of `process-control` (default-on)
group.adopt(&child)?;            // from now on the group's teardown covers it
}

Adoption is best-effort by mechanism — on Windows/cgroup the whole running tree joins; on the POSIX process-group backends an exec'd child is contained individually (its future forks too, where it could be re-grouped). The guide spells out exactly what each mechanism can promise.

Fine print: Process groups → adopt · Platform support.

Test code that runs processes — without processes

#![allow(unused)]
fn main() {
use processkit::testing::{Reply, ScriptedRunner};

// Your code takes any `R: ProcessRunner`; in tests, hand it a script.
// Rules match on a prefix of the *program name followed by its arguments*
// (the first element is the program):
let runner = ScriptedRunner::new()
    .on(["git", "rev-parse"], Reply::ok("abc123\n"))
    .on(["git", "push"], Reply::fail(128, "remote: permission denied"))
    .fallback(Reply::ok(""));

// my_deploy(&runner).await? — no subprocess, fully deterministic.
}

RecordingRunner wraps any runner and captures every Invocation for assertions; MockRunner (feature mock) gives mockall expectations; and the record feature's RecordReplayRunner records real runs into a JSON cassette once and replays them hermetically in CI.

Fine print: Testing your code.

Test streaming code — without processes

#![allow(unused)]
fn main() {
use processkit::{Command, Outcome, Finished};
use processkit::testing::{Reply, ScriptedRunner};
use std::time::Duration;

let runner = ScriptedRunner::new()
    .on(["gh", "run", "watch"], Reply::lines(["queued", "in_progress", "completed"])
        .with_line_delay(Duration::from_millis(50))); // paced delivery

let mut run = runner.start(&Command::new("gh").args(["run", "watch", "123"])).await?;
run.wait_for_line(|l| l.contains("completed"), Duration::from_secs(5)).await?;
let Finished { outcome, .. } = run.finish().await?;
assert_eq!(outcome, Outcome::Exited(0));
}

A scripted start() feeds the canned lines through the same pump machinery a real child uses, so stdout_lines, the readiness probes, and finish behave identically — and with_line_delay is deterministic under #[tokio::test(start_paused = true)]. Canned output also replays through on_stdout_line/on_stderr_line handlers on the bulk verbs, so progress-reporting paths test hermetically too.

Fine print: Testing → scripted streaming.

Wrap a CLI tool behind a typed API

#![allow(unused)]
fn main() {
use processkit::{cli_client, ProcessRunner, Result};

cli_client!(pub struct Git => "git");

impl<R: ProcessRunner> Git<R> {
    pub async fn current_branch(&self) -> Result<String> {
        // A verb takes the args directly; pass a built `command(..)` only
        // when you need to customize it (per-call timeout, stdin, …).
        self.core.run(["branch", "--show-current"]).await
    }
    pub async fn is_clean(&self) -> Result<bool> {
        self.core.probe(["diff", "--quiet"]).await
    }
}
}

The generated struct carries a runner and per-client defaults (default_timeout, default_env); your methods are just argument lists and parsers — and because the runner is injectable, the whole wrapper is testable with the previous recipe's ScriptedRunner.

Fine print: Testing your code → CliClient.

Running commands

‹ docs index

Command is the entry point of the runner layer: a builder describing what to run and how, plus a family of consuming verbs that decide what you get back. Every one-shot verb spawns the child into a fresh, private kill-on-drop process group, so an early return, panic, or dropped future can never leak a process tree.

Program, arguments, working directory

#![allow(unused)]
fn main() {
use processkit::Command;

let out = Command::new("git")
    .arg("log")                          // one at a time…
    .args(["--oneline", "-n", "10"])     // …or in bulk
    .current_dir("/path/to/repo")        // run there
    .run()
    .await?;
}

Arguments are passed as an array — there is no shell between you and the child, so there is no quoting, no word-splitting, and no injection surface. (When you actually want a | b | c, use a pipeline, which connects the stages in-process instead of invoking a shell.)

The program name reaches the OS verbatim — two deliberate non-goals (conveniences some libraries layer on, e.g. duct): a bare name is resolved on PATH by the OS, never rewritten to ./name; and current_dir does not re-anchor a relative program path against the new directory — whether Command::new("./tool").current_dir(dir) resolves tool relative to dir is the platform's behavior (Unix: yes; Windows: the parent's directory may win). Pass absolute program paths when combining the two.

For quick one-liners the free functions skip the builder:

#![allow(unused)]
fn main() {
let version = processkit::run("cargo", ["--version"]).await?;       // trimmed stdout, success required
let result  = processkit::output_string("git", ["status", "-s"]).await?;   // full ProcessResult
}

Environment

Four builders compose, applied in a fixed order at spawn:

#![allow(unused)]
fn main() {
use processkit::Command;

Command::new("worker")
    .env("RUST_LOG", "debug")        // set one variable
    .env_remove("GIT_DIR")           // unset one inherited variable
    .run().await?;

// Allow-list mode: clear everything, copy only the named parent variables.
Command::new("sandboxed-tool")
    .inherit_env(["PATH", "HOME", "LANG"])
    .env("MODE", "ci")               // explicit env/env_remove still apply on top
    .run().await?;

// Scorched earth: the child starts with an empty environment.
Command::new("hermetic-tool").env_clear().run().await?;
}

inherit_env is the sandboxing middle ground: it implies env_clear, then copies the listed variables from the parent at each spawn (so a retry sees fresh values), and repeated calls accumulate names. A name the parent doesn't have is skipped, not set to empty.

Standard input

By default stdin is closed at spawn — the child reads EOF immediately and can never hang waiting for input. Everything else is opt-in via stdin(Stdin::…):

SourceReusable on re-run?Use for
Stdin::empty()The default, explicit
Stdin::from_string("…")Text payloads
Stdin::from_bytes(vec![…])Binary payloads
Stdin::from_iter_lines(["a", "b"])Anything iterable; each item is written \n-terminated
Stdin::from_file(path)✅ (re-opened per run)Large inputs streamed from disk
Stdin::from_reader(reader)❌ one-shotAny AsyncRead — a socket, a decompressor, …
Stdin::from_lines(stream)❌ one-shotAny Stream<Item = String> — a channel, a tail, …
#![allow(unused)]
fn main() {
use processkit::{Command, Stdin};

let sorted = Command::new("sort")
    .stdin(Stdin::from_iter_lines(["banana", "apple", "cherry"]))
    .run()
    .await?;
assert_eq!(sorted, "apple\nbanana\ncherry");
}

The payload is written on a background task (so a large input can't deadlock against the child's output) and the pipe is dropped afterwards to signal EOF. The two one-shot sources are consumed by their first run: a retried or cloned command reusing them fails loud the second time — re-running a consumed from_reader/from_lines source is an Error::Io (InvalidInput) at launch, not a silent empty stdin. Prefer the reusable sources when a command may run more than once.

For conversational, request/response stdin — write a line, read the answer, repeat — use keep_stdin_open() and the streaming API instead: see Streaming & interactive I/O.

Output handling

Encodings

Output is decoded line by line, UTF-8 by default (invalid bytes become U+FFFD, never an error). Legacy-encoding tools can override per stream:

#![allow(unused)]
fn main() {
use processkit::Command;

let out = Command::new("legacy-tool")
    .encoding(encoding_rs::SHIFT_JIS)          // both streams…
    // .stdout_encoding(…) / .stderr_encoding(…) // …or each its own
    .output_string()
    .await?;
}

(processkit::Encoding re-exports encoding_rs::Encoding, so any of its encodings works — the single-byte and ASCII-compatible multibyte ones (WINDOWS_1252, GBK, SHIFT_JIS, …) and the non-ASCII-compatible ones (UTF_16LE/UTF_16BE): output is fed through one persistent decoder and split on decoded newlines, so a 0x0A byte inside a UTF-16 code unit is not mistaken for a line break. A leading byte-order mark of the chosen encoding is stripped once at the stream start.)

Buffer policies — bounding memory on chatty children

Captured lines are held in memory; a multi-gigabyte log would normally grow the buffer to match. output_buffer bounds retention (the pipe is always fully drained, so the child never blocks):

#![allow(unused)]
fn main() {
use processkit::{Command, OutputBufferPolicy, OverflowMode};

let tail = Command::new("verbose-build")
    .output_buffer(OutputBufferPolicy::bounded(1_000)) // keep the newest 1000 lines
    .output_string()
    .await?;

// …or keep the head instead of the tail:
let head_policy = OutputBufferPolicy::bounded(1_000).with_overflow(OverflowMode::DropNewest);
}

DropOldest (the default) keeps a rolling tail; DropNewest freezes the head. bounded(0) retains nothing — useful when a line handler (below) is the real consumer. Under a line cap, dropped or not, every line still feeds the handlers and the line counters.

The line cap alone does not bound memory — one enormous newline-free "line" (base64 -w0) is held whole. Add with_max_bytes to cap the retained bytes too (either ceiling, or both); the byte cap also bounds the pump's in-flight assembly buffer, so a never-terminated flood can't exhaust memory. One consequence: a line whose own length exceeds the byte cap can't be assembled, so it is dropped whole — counted, but not delivered to a per-line handler or stdout_tee (don't set a byte cap if a tee must see arbitrarily long lines):

#![allow(unused)]
fn main() {
use processkit::{Command, OutputBufferPolicy};
let policy = OutputBufferPolicy::unbounded().with_max_bytes(8 << 20); // 8 MiB ring
let strict = OutputBufferPolicy::fail_loud(10_000).with_max_bytes(8 << 20); // error on either
}

fail_loud makes the ceiling error instead of dropping: the run fails with Error::OutputTooLarge once the cumulative output (lines or bytes) crosses the cap — even when a streaming consumer is draining lines as they arrive. It bounds memory, not wall-time, so pair it with timeout against a flooding child.

Even under a drop policy (DropOldest/DropNewest), the checking verbs that hand back stdout as if complete — run, parse, try_parserefuse silently-truncated output: if the policy dropped lines they fail with Error::OutputTooLarge rather than feed a parser a truncated tail. The lenient capture verbs (output_string / output_bytes) are unaffected — they return the partial result with truncated() set for you to inspect.

Line handlers — tee output as it arrives

on_stdout_line / on_stderr_line run a callback on each decoded line in addition to capture or streaming — logging, progress bars, metrics:

#![allow(unused)]
fn main() {
use processkit::Command;

let result = Command::new("cargo")
    .args(["build", "--release"])
    .on_stderr_line(|line| eprintln!("[build] {line}"))
    .output_string()
    .await?;
}

The handler runs on the read pump — keep it cheap. The contract is forgiving and precisely specified:

  • A panicking handler does not poison the run. The panic is caught, the handler is disabled for the rest of the run (surfaced as a tracing warn when that feature is on), and pumping continues — the final result still carries every line. You can safely re-export this callback seam to your own users without auditing their closures.
  • Ordering: invocations are FIFO within a stream; there is no ordering between stdout and stderr handlers (two independent pumps). On the consuming verbs, all handler calls happen-before the awaited future resolves — finalize a progress bar the moment the call returns. (One documented exception: a leaked pipe held open past the child's death is cut off after a bounded teardown grace.)
  • Handlers are hermetically testable: ScriptedRunner replays canned output through them — see Testing → scripting replies.

For a ready-made tee to an async sink — a file, socket, or any [tokio::io::AsyncWrite] — reach for stdout_tee / stderr_tee instead of hand-writing a handler. Each decoded line is written to the sink (plus a \n) as it is produced, awaited on the pump so a slow sink applies backpressure (the pump slows, the pipe fills, the child blocks) rather than blocking the runtime; a write error disables the tee with a tracing warn instead of being swallowed. It runs independently of on_stdout_line — set both and both fire per line.

Timeouts and retries

#![allow(unused)]
fn main() {
use processkit::{Command, Error};
use std::time::Duration;

let out = Command::new("flaky-network-tool")
    .timeout(Duration::from_secs(30))                 // kill the tree at the deadline
    .retry(3, Duration::from_millis(200), |e| {       // up to 3 attempts total
        matches!(e, Error::Timeout { .. })            // …but only retry timeouts
    })
    .run()
    .await?;
}
  • timeout kills the whole process tree at the deadline. On the capturing verbs the expiry is captured (ProcessResult::timed_out), on the success-checking verbs it raises Error::Timeout — the full decision table lives in Timeouts, retries & cancellation.
  • retry applies to the success-checking verbs only (run, exit_code, probe, and ProcessRunnerExt::checked); the classifier sees the typed error and decides. The non-erroring output_string path never retries.

Privileges and spawn flags

Spawn-time controls for sandboxing and service launch:

#![allow(unused)]
fn main() {
use processkit::Command;

// Unix: drop privileges (uid + gid + supplementary groups) and detach.
Command::new("worker")
    .gid(1000)            // applied before uid (a gid change needs privilege)
    .groups([1000])       // replace the inherited (often root's) supplementary groups
    .uid(1000)            // dropped last
    .setsid()             // new session: survives the controlling terminal
    .run().await?;

// Windows: no console window flashing up from a GUI app.
Command::new("helper").create_no_window().run().await?;

// Hardening: take the direct child down even if THIS process is SIGKILLed
// (Drop never runs). Windows has this for free; Linux arms PDEATHSIG.
Command::new("worker").kill_on_parent_death().start().await?;
}

uid / gid / groups / setsid are POSIX-only — on Windows the run fails with Error::Unsupported rather than silently skipping a privilege drop. A correct drop sets all three of uid/gid/groups: dropping the uid alone leaves the child holding the parent's (often root's) supplementary groups. create_no_window is a harmless no-op outside Windows. kill_on_parent_death is best-effort by design: guaranteed on Windows (regardless of the knob), direct-child-only on Linux, unavailable on macOS/BSD — the graceful-exit guarantee via Drop holds everywhere either way. Containment is preserved in every combination; the platform fine print (the Linux cgroup × uid interaction, setsid × process-group coordination, the pdeathsig thread caveat) is collected in Platform support.

Interactive auth / TTY. processkit wires pipes, not a pseudo-terminal, so a tool that demands a tty — an ssh/sudo password prompt, some credential helpers — won't get one (PTY support is not implemented; the trade-off is recorded in decisions/permissions-privileges-pty-network.md). Drive such tools non-interactively instead: key-based auth, ssh -o BatchMode=yes, GIT_SSH_COMMAND / GIT_TERMINAL_PROMPT=0, or feed a known answer over interactive stdin. Conversational tools that read stdin without needing a tty already work today via keep_stdin_open + stdout_lines.

Consuming verbs

VerbReturnsNon-zero exitTimeoutUse when
output_string()ProcessResult<String>capturedcaptured (timed_out)You want to inspect the outcome yourself
output_bytes()ProcessResult<Vec<u8>>capturedcapturedBinary stdout (images, archives, …)
run()trimmed stdout StringError::ExitError::Timeout"Give me the answer or fail"
exit_code()i32the code, OkError::TimeoutThe code is the answer
probe()bool0true, 1false, else Error::ExitError::TimeoutPredicate commands: git diff --quiet, grep -q
first_line(pred)Option<String>— (stream-based)Error::TimeoutGrab one matching line, kill the rest
start()live RunningProcessbounds the streamStreaming, interactive I/O, probes
#![allow(unused)]
fn main() {
use processkit::Command;

// probe(): the exit code as a boolean.
let clean = Command::new("git").args(["diff", "--quiet"]).probe().await?;

// first_line(): stop as soon as the interesting line appears.
let first_match = Command::new("git")
    .args(["log", "--oneline"])
    .first_line(|l| l.contains("fix:"))
    .await?;
}

first_line returns Ok(None) when stdout closes without a match, and kills the (private-group) child once it has its answer — you never wait out a long log for one line. If the command's cancel_on token has fired, it returns Error::Cancelled instead of Ok(None), so a readiness probe with a shutdown token can't misread cancellation as "the line never appeared".

Results and errors

The capturing verbs hand back a ProcessResult:

#![allow(unused)]
fn main() {
use processkit::Command;

let result = Command::new("git").args(["merge", "feature"]).output_string().await?;

result.code();         // Option<i32> — None = killed (timeout/signal), no code
result.signal();       // Option<i32> — the signal number (Unix), else None
result.is_success();   // code in ok_codes (default {0})
result.timed_out();    // the run's own deadline expired
result.outcome();      // the explicit three-way enum behind the accessors above
result.stdout();       // &str (or &[u8] from output_bytes)
result.stderr();       // &str
result.combined();     // stdout + stderr concatenated
result.diagnostic();   // stderr if non-empty, else stdout — the human-facing line
                       // (git/jj put "CONFLICT …" on stdout!)

// Opt into erroring whenever you're ready:
let ok = result.ensure_success()?; // Exit / Timeout / Signalled (signal-kill) as typed errors
}

When the three-way distinction matters, match on Outcome instead of mentally decoding the code()/timed_out() pair:

#![allow(unused)]
fn main() {
use processkit::Outcome;

match result.outcome() {
    Outcome::Exited(0) => println!("clean"),
    Outcome::Exited(code) => println!("failed with {code}"),
    Outcome::Signalled(signal) => println!("killed by signal {signal:?}"),
    Outcome::TimedOut => println!("hit its deadline"),
    _ => {} // non_exhaustive: future dispositions
}
}

For a single query you usually don't need the match (and its #[non_exhaustive] wildcard): Outcome carries the same code() / signal() / timed_out() accessors as ProcessResult, so a bare Outcome (from RunningProcess::wait or Finished::outcome) answers directly — outcome.code(), outcome.signal(), outcome.timed_out(). There is no Outcome::is_success (success is ok_codes-aware — use ProcessResult::is_success).

The error enum is structured and #[non_exhaustive]:

VariantMeaning
Error::Spawn { program, source }The program was located but the OS couldn't start it (permissions, a bad working directory, a Windows .cmd/.bat needing cmd.exe, …) — not is_not_found()
Error::NotFound { program, searched }The program couldn't be located (the single "not found" representation — is_not_found() is true); searched is Some(dirs) for a bare-name PATH lookup, None otherwise
Error::Exit { program, code, stdout, stderr }Non-zero exit, both streams attached in full (the Display message is bounded, but the fields carry the complete captured text for classification)
Error::Signalled { program, signal, stdout, stderr }The process was killed by a signal (no exit code); signal carries the number on Unix, None elsewhere; the partial streams captured before the kill are attached (reach them via diagnostic())
Error::OutputTooLarge { program, line_limit, byte_limit, total_lines, total_bytes }A fail_loud buffer's line or byte ceiling was exceeded
Error::Timeout { program, timeout, stdout, stderr }The run's own deadline killed it; whatever the run captured before the kill is attached — a hung tool's last stderr line tails the Display and is reachable via diagnostic()
Error::NotReady { program, timeout }A readiness probe gave up
Error::Parse { program, message }A try_parse parser (on Command, ProcessRunnerExt, CliClient, or Pipeline) rejected the output (the Display/Debug of message is bounded to a 200-byte preview; the field carries the full text)
Error::Stdin { program, source }Feeding the child's stdin failed for a non-broken-pipe reason on an otherwise-successful run (a louder failure — exit/signal/timeout — wins instead); a routine broken pipe never surfaces
Error::CassetteMiss { program }(record feature) a cassette replay found no matching recording (stale/incomplete cassette) — kept distinct from a missing program, so is_not_found() is false
Error::Unsupported { operation }The platform can't do what was asked (and silently skipping would be wrong)
Error::Cancelled { program }the run's token was cancelled
Error::ResourceLimit { message }(limits feature) a requested cap couldn't be enforced
Error::Io(source)A low-level IO error from the crate's own machinery (driving a child, group control, cassette files) — never an arbitrary foreign io::Error (no blanket From)

Error::diagnostic() returns the most useful human-facing line out of a failure that captured output — Exit, Timeout, and Signalled (the partial streams of a hung-then-killed or crashed tool). Each of those variants' one-line Display also appends a bounded excerpt of that diagnostic (the last non-empty line, capped at 200 bytes), so a bare eprintln!("{e}") reads `git` exited with code 2: fatal: boom — actionable in a log line without dumping multi-KiB streams into it.


Next: Streaming & interactive I/O · Timeouts, retries & cancellation · Process groups

Process groups

‹ docs index

A ProcessGroup ties the lifetime of a whole child-process tree to a Rust value: every process spawned into the group — and everything those processes spawn — is killed when the group is dropped. An exiting, panicking, or ?-returning owner never leaks subprocesses; the kernel object enforcing this (Job Object / cgroup / POSIX process group) catches even grandchildren you never knew about. (Killing grandchildren is the problem duct.py's gotchas list files under "currently unsolved" for pipe-based designs — kernel containment is the solution, and the reason this crate exists.)

Creating a group

#![allow(unused)]
fn main() {
use processkit::{ProcessGroup, ProcessGroupOptions};
use std::time::Duration;

// Defaults: 2s graceful-shutdown grace, escalate to SIGKILL.
let group = ProcessGroup::new()?;

// Tuned:
let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .shutdown_timeout(Duration::from_secs(10))
        .escalate_to_kill(true),
)?;

// Which kernel mechanism is actually containing the tree?
println!("{:?}", group.mechanism()); // JobObject | CgroupV2 | ProcessGroup
}

mechanism() reports what you actually got: CgroupV2 quietly falls back to ProcessGroup on Linux hosts without cgroup delegation (see Platform support).

You rarely create a group explicitly for one-shot runs: every Command::run()-style call makes a private group automatically. Reach for an explicit group when several children should share one fate, or when you need the group verbs below.

Putting processes in

Three doors, in order of preference:

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup};

let group = ProcessGroup::new()?;

// 1. start(): the full Command experience (capture, streaming, timeouts) in a
//    SHARED group. The handle does not own the group — dropping the handle
//    kills that child, dropping the group kills everyone.
let server = group.start(&Command::new("dev-server")).await?;

// 2. spawn(): the raw escape hatch for a tokio::process::Command you already
//    have. You get the bare Child back; pipes and reaping are your problem.
//    spawn() takes the command BY VALUE (reuse would stack pre-exec hooks).
let raw = tokio::process::Command::new("background-helper");
let child = group.spawn(raw)?;

// 3. adopt(): contain a child that was spawned OUTSIDE the group.
let external = tokio::process::Command::new("legacy-launcher").spawn()?;
group.adopt(&external)?;
}

adopt moves only the named process: descendants it already has keep their old containment (future forks are captured — on Windows/cgroup). A few sharp edges worth knowing:

  • A child that already exited but has not been reaped (no wait() yet — a zombie whose pid/handle is still valid) is a successful no-op: there is nothing left to contain, so adopt returns Ok on the containment backends.
  • A child that already exited and was reaped (wait()ed) has no pid left — adopt returns an error rather than silently tracking nothing.
  • On the POSIX process-group mechanism, a child that has already exec'd can't be re-grouped (POSIX forbids it), so it is tracked individually: the child itself is signalled/killed with the group, but its future forks are not. The caller keeps the Child handle and is responsible for reaping.

Tearing down: drop, terminate, shutdown

VerbWhat happensWhen
drop(group)Immediate hard kill of the whole tree (kill-on-close)The safety net — always on
group.terminate_all()The same hard kill, group stays usable (cgroup-kill / Job Object / process-group backends). On a pre-5.14 Linux kernel lacking cgroup.kill, the per-pid SIGKILL fallback returns Err if the tree doesn't drain (a fork bomb still out-spawning, or D-state zombies)Explicit teardown mid-flight; idempotent
group.shutdown().awaitUnix: SIGTERM → wait shutdown_timeoutSIGKILL survivors (if escalate_to_kill); Windows: atomic job kill. Consumes the groupGraceful service stop
#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup, ProcessGroupOptions};
use std::time::Duration;

let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .shutdown_timeout(Duration::from_secs(5))
        .escalate_to_kill(true),
)?;
let _service = group.start(&Command::new("my-service")).await?;

// SIGTERM, give it 5s to flush and exit, SIGKILL stragglers:
group.shutdown().await?;
}

A child that handles SIGTERM ends the grace earlyshutdown returns as soon as the tree is empty, not after the full timeout. One subtlety: the liveness probe sees an exited-but-unreaped child (a zombie) as alive on the process-group backends, so keep wait()ing your handles concurrently if you want the early return. Drop can't await, which is why the graceful tier lives in this async method — dropping without calling it performs only the hard kill.

Signalling the whole tree

signal/suspend/resume/members/adopt — this section and the two below — require the default-on process-control feature. The teardown verbs above are core and always present.

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup, Signal};

let group = ProcessGroup::new()?;
let _server = group.start(&Command::new("my-server")).await?;

group.signal(Signal::Hup)?;        // "reload your configuration"
group.signal(Signal::Usr1)?;       // whatever the tool defines
group.signal(Signal::Other(34))?;  // raw signal number escape hatch
}
PlatformDeliverable signals
Linux (cgroup or pgroup), macOS/BSDAny — Term, Kill, Int, Hup, Quit, Usr1, Usr2, Other(n)
WindowsKill only (maps to the Job Object terminate); anything else → Error::Unsupported

Signal::Kill always takes the same atomic whole-tree kill path as terminate_all (cgroup.kill / killpg / job terminate), so it cannot miss a process forked mid-broadcast. Other signals are a per-member broadcast — best-effort against a tree that is forking at that exact moment. An empty group accepts any deliverable signal trivially. On the cgroup mechanism a real per-member delivery failure (e.g. EPERM from a member that changed uid, or a seccomp/container restriction) is surfaced as an Err rather than swallowed — an ESRCH race (the member already exited) is still success; the pgroup (macOS/BSD, Linux-without-cgroup) backend remains purely best-effort.

Suspending and resuming

Freeze a tree (to snapshot it, to starve a runaway while you investigate, to pause background work), then thaw it:

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup};

let group = ProcessGroup::new()?;
let _cruncher = group.start(&Command::new("cpu-hog")).await?;

group.suspend()?;   // the whole tree stops consuming CPU
// … inspect, snapshot, wait for the user …
group.resume()?;
}

Per-platform machinery — and its visible differences:

PlatformMechanismNotes
Linux cgroupone cgroup.freeze writeAtomic over the subtree; freeze is group state
Linux pgroup, macOS/BSDSIGSTOP / SIGCONT broadcastIdempotent (level-triggered)
Windowsper-thread SuspendThread walkCounted: N suspends need N resumes; best-effort against mid-walk thread churn

Two caveats that bite in practice:

  • Spawning into a suspended group diverges. Under the cgroup mechanism a child spawned or adopted while the group is frozen starts frozen — and start() may never return until resume (the forked child joins the cgroup before exec, so it can freeze before completing the spawn handshake). Windows and the pgroup backends freeze only members present at the call. Rule of thumb: resume before starting new work.
  • A suspended tree can still be hard-killed (drop / terminate_all / Signal::Kill all act on frozen processes), but a graceful shutdown starts with a SIGTERM the frozen tree can't act on — it would wait out the whole grace. Resume first for a clean shutdown.

Listing members

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup};

let group = ProcessGroup::new()?;
let _a = group.start(&Command::new("worker-a")).await?;
let _b = group.start(&Command::new("worker-b")).await?;

let pids: Vec<u32> = group.members()?;
println!("live members: {pids:?}");
}

What "members" means depends on the mechanism: Windows and Linux-cgroup list the whole tree (every descendant pid); the POSIX process-group backends list the tracked group leaders (one pid per started/adopted child) — their descendants are contained but not enumerated. An exited child still counts until it is reaped. The snapshot is point-in-time: a tree that is forking races it.

To wait on members rather than list them, race the handles with wait_any.

Resource limits

Requires the limits feature. Caps are a property of the group, set once at creation and enforced by the same kernel object that contains the tree:

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup, ProcessGroupOptions};

let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .memory_max(512 * 1024 * 1024) // bytes, whole tree
        .max_processes(64)             // fork-bomb ceiling
        .cpu_quota(0.5),               // half of one core
)?;
let _sandboxed = group.start(&Command::new("untrusted-tool")).await?;
}
CapabilityWindows Job ObjectLinux cgroup v2pgroup / macOS / BSD
Memory cap✅ whole-tree✅ whole-tree (memory.max)
Process-count cap✅ (pids.max)
CPU quota🟡 approximate (rate vs. total CPU)✅ (cpu.max)

cpu_quota is a fraction of a single core (2.0 = two cores). Limits need a real container; when a requested cap can't be enforced — no Job Object/cgroup, or a Linux cgroup whose controllers can't be enabled — with_options returns Error::ResourceLimit instead of handing back a silently-unbounded group. On Linux this needs the process to run at the real cgroup-v2 root: the crate enables the controllers in this process's own cgroup, which cgroup v2's "no internal processes" rule allows only for the real hierarchy root — not a cgroup-namespace root (so an ordinary container fails too), not under systemd — and the crate doesn't migrate your process. See the limits prerequisites in Platform support. The uid()-drop interaction lives under its Caveats.

Stats and sampling

Requires the opt-in stats feature (features = ["stats"], or limits).

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup, StreamExt};
use std::time::Duration;

let group = ProcessGroup::new()?;
let _worker = group.start(&Command::new("worker")).await?;

// Point-in-time:
let snap = group.stats()?;
println!(
    "procs={} cpu={:?} peak_rss={:?}",
    snap.active_process_count, snap.total_cpu_time, snap.peak_memory_bytes,
);

// …or a series: first sample immediate, then every 250ms; missed ticks are
// skipped; the stream ends when the group can no longer report.
let mut samples = group.sample_stats(Duration::from_millis(250));
while let Some(s) = samples.next().await {
    println!("rss now: {:?}", s.peak_memory_bytes);
}
}

CPU time and peak memory are available where the kernel accounts for the whole tree (Windows, Linux cgroup); the process-group backends report the member count only — the Option fields stay None. The sampler borrows the group, so it can neither outlive it nor keep it (and the kill-on-drop guarantee) alive. For a single run's end-to-end summary, see profile.


Next: Streaming & interactive I/O · Platform support · Supervision

Streaming & interactive I/O

‹ docs index

The one-shot verbs in Running commands buffer the whole output. For long-running or conversational children, Command::start() returns a live RunningProcess you drive yourself: stream stdout as it arrives, write stdin incrementally, probe for readiness, race several children, or profile a run.

Lifecycle

#![allow(unused)]
fn main() {
use processkit::Command;

let mut run = Command::new("dev-server").start().await?;

run.pid();        // Option<u32> — None once the child is reaped
run.elapsed();    // time since spawn

// Consume the handle exactly one way:
//   output_string() / output_bytes()  → capture everything (same as the one-shot verbs)
//   wait()                            → just the Outcome; output is discarded
//   finish()                 → after streaming stdout (below)
//   profile(every)                    → capture + resource samples (stats feature)
let outcome = run.wait().await?;   // Outcome: Exited(code) / Signalled(sig) / TimedOut
}

start() puts the child in a private group the handle owns: dropping the RunningProcess kills the whole tree, exactly like dropping a one-shot run's future. The shared-group variant — group.start(&cmd) — gives the same handle but the group controls the tree's fate (see Process groups).

There is also an explicit run.start_kill() for "stop it now, I'll wait() for the code myself".

Streaming stdout

stdout_lines() yields decoded lines as the child produces them — no waiting for exit, no full-output buffering. StreamExt (re-exported from tokio-stream) provides .next():

use processkit::{Command, Outcome, StreamExt, Finished};

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let mut run = Command::new("cargo")
        .args(["build", "--release"])
        .start()
        .await?;

    let mut lines = run.stdout_lines()?;
    while let Some(line) = lines.next().await {
        println!("build: {line}");
    }

    // The stream ended (stdout closed). Collect the outcome and stderr —
    // stderr was drained in the background the whole time, so a noisy child
    // could never block on a full pipe.
    let Finished { outcome, stderr, .. } = run.finish().await?;
    if outcome != Outcome::Exited(0) {
        eprintln!("build failed ({outcome:?}):\n{stderr}");
    }
    Ok(())
}

Things to know:

  • Call stdout_lines() once. It is fallible: a second stdout_lines / output_events call (stdout is consumed once), or a non-piped stdout (StdioMode::Inherit/Null), returns Err rather than a silently-empty stream.
  • The command's timeout bounds the stream on an own-group handle: at the deadline the tree is killed, the pipes close, and the stream ends — a streamed run can't hang past its deadline. A cancel_on token ends it the same way; the following finish then reports Error::Cancelled. Details in Timeouts & cancellation.
  • Line counters tick live: run.stdout_line_count() / stderr_line_count() are cheap progress gauges even while you stream.
  • The buffer policy and line handlers apply to streamed runs too — a handler sees each line on the pump, in addition to your loop.
  • The whole streaming surface is hermetically testable: a ScriptedRunner's start() returns a handle whose canned lines flow through the same pump machinery — stdout_lines, the readiness probes, and finish behave identically with no subprocess. See Testing → scripted streaming.

Interactive stdin

Conversational tools — write a request, read the response, repeat. Keep stdin open with keep_stdin_open(), take the writer with take_stdin():

use processkit::{Command, Outcome, StreamExt, Finished};

// `ProcessStdin`'s writer methods return `std::io::Result`; `Box<dyn Error>`
// mixes them with the crate's `Result` (or `.map_err(processkit::Error::Io)?`).
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // `bc` evaluates each stdin line and prints the result.
    let mut run = Command::new("bc").keep_stdin_open().start().await?;
    let mut stdin = run.take_stdin().expect("stdin was kept open");
    let mut answers = run.stdout_lines()?;

    stdin.write_line("2 + 2").await?;             // writes "2 + 2\n", flushed
    println!("= {}", answers.next().await.unwrap());

    stdin.write_line("6 * 7").await?;
    println!("= {}", answers.next().await.unwrap());

    stdin.finish().await?;                        // send EOF — bc exits
    let Finished { outcome, .. } = run.finish().await?;
    assert_eq!(outcome, Outcome::Exited(0));
    Ok(())
}

ProcessStdin offers write(&[u8]), write_line(&str) (newline + flush), flush(), and finish() (EOF). Dropping the writer — or the whole RunningProcess — closes stdin too; finish() just makes the EOF explicit and awaitable.

Avoid the full-duplex deadlock. A child's stdout pipe has a finite OS buffer; once it fills, the child blocks writing stdout until something reads it. If you push a large interactive stdin while nothing drains the child's stdout, the child stops reading stdin (blocked on stdout), your write parks waiting for stdin buffer space, and neither side progresses. The bc example above is safe because it interleaves one write with one read; when you both feed a sizable stdin and the child produces output, drain stdout_lines from one task while writing stdin from another. (The non-interactive Stdin::from_* sources are safe — the crate writes them on a background task that runs concurrently with the output pumps.)

For one-directional streamed input (a channel, a file tail) you don't need interactivity — give the command Stdin::from_lines(stream) / Stdin::from_reader(reader) and let the background writer feed it; see the stdin source table.

Readiness probes

"Start a server, then use it" needs ready, not merely started. Three probes replace the arbitrary sleep, each bounded by its own deadline:

#![allow(unused)]
fn main() {
use processkit::Command;
use std::time::Duration;

let mut run = Command::new("my-server").start().await?;

// 1. A line on stdout (returns the matching line):
let banner = run
    .wait_for_line(|l| l.contains("listening on"), Duration::from_secs(10))
    .await?;

// 2. A TCP port accepting connections:
run.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10))
    .await?;

// 3. Any async predicate (an HTTP /health endpoint, a file appearing, …):
run.wait_for(|| async { health_check().await }, Duration::from_secs(10))
    .await?;

// ready — use the server…
}

Probe semantics, deliberately uniform:

  • A probe that can't pass within its deadline fails with Error::NotReady — distinct from Error::Timeout, which is the run's own deadline.
  • A probe also fails fast once readiness can no longer happen: the child exits, or (for wait_for_line) its stdout closes — no waiting out a 30s deadline on a dead server.
  • A failed probe never kills the child. You decide: retry, log and continue, or tear down.
  • wait_for_line consumes stdout up to (and including) the match — continue with finish or further streaming. wait_for_port / wait_for don't touch the pipes at all.

Racing children with wait_any

The free function wait_any races several running processes and reports whichever exits first — the natural primitive for "restart whatever died" or "first answer wins":

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup, wait_any};

let group = ProcessGroup::new()?;
let mut a = group.start(&Command::new("replica-a")).await?;
let mut b = group.start(&Command::new("replica-b")).await?;

let (index, outcome) = wait_any(&mut [&mut a, &mut b]).await?;
println!("contender #{index} exited first with {outcome:?}");

// Only borrows: the loser is still usable.
let survivor = if index == 0 { &mut b } else { &mut a };
}

wait_any takes &mut borrows, applies no timeout of its own (wrap it in tokio::time::timeout to bound the race), and does no output pumping — drain chatty children first or give them bounded buffer policies.

Per-run telemetry

With the opt-in stats feature, a running child reports its own resource usage, and profile() turns a whole run into a summary:

#![allow(unused)]
fn main() {
use processkit::Command;
use std::time::Duration;

let run = Command::new("crunch").start().await?;
run.cpu_time();          // Option<Duration> — user+kernel so far
run.peak_memory_bytes(); // Option<u64>

// …or capture + sample on an interval until exit:
let profile = Command::new("crunch")
    .start().await?
    .profile(Duration::from_millis(100))
    .await?;

println!(
    "exit={:?} wall={:?} cpu={:?} peak_rss={:?} avg_cpu={:?} ({} samples)",
    profile.exit_code,
    profile.duration,
    profile.cpu_time,
    profile.peak_memory_bytes,
    profile.avg_cpu(),          // cpu / wall — e.g. Some(1.7) ≈ 1.7 cores busy
    profile.samples,
);
}

These read the child process itself (not a whole tree — that's ProcessGroup::stats), and availability follows the platform: full CPU/memory on Windows and Linux, None where the kernel doesn't account per-process cheaply — see Platform support.


Next: Pipelines · Timeouts, retries & cancellation · Supervision

Pipelines

‹ docs index

a | b | c without a shell. Each stage's stdout feeds the next stage's stdin through an in-process relay (a tokio::io::copy task per boundary) — there is no shell string anywhere, so no quoting rules, no word splitting, no injection surface. All stages spawn into one shared kill-on-drop process group, so the chain lives and dies as a unit. (The relay is an implementation detail, not a kernel splice: a producer whose consumer exits early stops on a broken pipe when the relay's next write fails, rather than instantly via SIGPIPE.)

Building and running

Command::pipe(next) starts a Pipeline; chain more stages with Pipeline::pipe; drive it with output_string() or run():

use processkit::Command;

#[tokio::main]
async fn main() -> processkit::Result<()> {
    // git log --format=%an | sort | uniq -c
    let authors = Command::new("git").args(["log", "--format=%an"])
        .pipe(Command::new("sort"))
        .pipe(Command::new("uniq").arg("-c"))
        .run()                         // require every stage to succeed
        .await?;
    println!("{authors}");
    Ok(())
}

The verbs mirror Command's, each operating on the pipefail outcome:

VerbReturnsA failing stage is…
output_string()ProcessResult<String>…reported in the result (code/stderr/program of the first unclean stage)
output_bytes()ProcessResult<Vec<u8>>…same, with the last stage's stdout captured raw (binary pipes)
run()trimmed final stdout…raised as that stage's Error::Exit; fails loud on a truncated capture
checked()full ProcessResult<String>…raised as Error::Exit (untrimmed stdout)
run_unit()()…raised as Error::Exit (output discarded)
exit_code()i32…its attributed code (no code → Error::Timeout/Signalled)
probe()bool0true, 1false, else Err
parse(|s| …) / try_parse(|s| …)T…raised as Error::Exit; fails loud on a truncated capture

Err from output_string itself means a stage couldn't be started or driven at all (spawn failure, broken plumbing) — never a mere non-zero exit.

The streaming first_line probe is deliberately not a pipeline verb: a chain consumes its last stage in full to fold the pipefail outcome. To capture the first matching line of a finished chain, add a | head -n1 (Unix) / grep -m1 / findstr stage and capture. This does not cover a streaming readiness probe over a chain that must keep running (e.g. wait for a banner line, then leave the chain alive) — | head would tear it down; use a single Command with first_line for that.

The | operator is sugar for the same thing — a | b | ca.pipe(b).pipe(c). Parenthesize the chain before a terminal verb, since method calls bind tighter than |:

#![allow(unused)]
fn main() {
use processkit::Command;

let authors = (Command::new("git").args(["log", "--format=%an"])
    | Command::new("sort")
    | Command::new("uniq").arg("-c"))
    .run()
    .await?;
}

Semantics: pipefail and the ends

The outcome is pipefail, like set -o pipefail in a shell:

  • stdout is always the last stage's output — that's what the chain produced.
  • code, stderr, and the reported program come from the first stage that didn't exit cleanly (non-zero, signal-killed, or timed out) — or from the last stage when every stage succeeded.
#![allow(unused)]
fn main() {
use processkit::Command;

let result = Command::new("cat").arg("data.txt")
    .pipe(Command::new("grep").arg("ERROR"))      // suppose grep exits 2 (bad pattern)
    .pipe(Command::new("wc").arg("-l"))
    .output_string()
    .await?;

// Diagnostics point at grep — the first unclean stage — while stdout is
// whatever wc managed to print:
assert_eq!(result.code(), Some(2));
println!("blamed: {}", result.ensure_success().unwrap_err()); // names `grep`
}

The ends of the chain behave like a single Command:

  • The first stage's configured stdin source is honored — feed the whole pipeline from a string, file, or stream.
  • Inner stages read from the pipe, full stop: any stdin source or keep_stdin_open configured on them is overridden.
  • Inner stages' stderr is captured per-stage for pipefail diagnostics; only the last stage's stdout reaches you.
#![allow(unused)]
fn main() {
use processkit::{Command, Stdin};

let unique_count = Command::new("sort")
    .stdin(Stdin::from_iter_lines(["b", "a", "b", "c"]))
    .pipe(Command::new("uniq"))
    .pipe(Command::new("wc").arg("-l"))
    .run()
    .await?;
assert_eq!(unique_count.trim(), "3");
}

Unchecked stages

Strict pipefail has one classic false positive: a consumer that legitimately stops reading early. In producer | head -1 the consumer exits 0 after one line and closes the pipe; the producer then stops on a broken pipe — its next write fails once the relay's downstream is gone (a broken-pipe write error, or SIGPIPE where the OS delivers it) — a perfectly normal death that strict pipefail would blame the chain for. Mark that stage unchecked_in_pipe():

#![allow(unused)]
fn main() {
use processkit::Command;

// seq 1 1000000 | head -1 — the producer's broken-pipe death is expected.
let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe()
    | Command::new("head").args(["-n", "1"]))
    .run()
    .await?;
assert_eq!(first.trim(), "1");
}

The rules:

  • An unchecked stage's unclean exit — a non-zero code, a broken-pipe write failure (or SIGPIPE where the OS delivers it) from a consumer that closed early, or its own per-stage timeout kill — is skipped when the chain decides what to report.
  • A checked failure always trumps an unchecked one, regardless of position: unchecked never shields another stage's real failure.
  • A chain whose only failures are unchecked reports success (the last stage's stdout, code 0).
  • unchecked forgives exit status only — never a whole-chain Pipeline::timeout, and it has no effect on a Command run outside a pipeline (a single run's status is already plain data in its ProcessResult).

Timeouts

Two scopes, deliberately distinct:

#![allow(unused)]
fn main() {
use processkit::Command;
use std::time::Duration;

let out = Command::new("producer")
    .timeout(Duration::from_secs(10))      // per-STAGE: kills just `producer`
    .pipe(Command::new("consumer"))
    .timeout(Duration::from_secs(30))      // whole-CHAIN: Pipeline::timeout
    .output_string()
    .await?;
}
  • Pipeline::timeout bounds the whole chain: at the deadline the shared group is torn down and the result reports timed_out (no partial stdout — unlike a single command's captured timeout).
  • A per-stage Command::timeout kills just that stage. Every stage is evaluated by the same pipefail rule: a stage that hit its own deadline — inner or last — surfaces on run() as that stage's Error::Timeout, reporting that stage's own deadline (not the chain's, and never 0ns).

Cancellation has two forms. Pipeline::cancel_on(token) is the chain-level control: the token is applied to every stage, so firing it tears the whole chain down and the run resolves to Error::Cancelled. (A cancel_on token on an individual stage Command also cancels that stage and errors the pipeline, but the pipeline-level builder is the clearer authority.) See Timeouts & cancellation.

Re-running a pipeline

A Pipeline is Clone and re-runnable — stages are re-cloned per run. The one caveat is inherited from Command: a one-shot stdin source on the first stage (Stdin::from_reader / from_lines) is consumed by the first run; re-running then fails loud (an Error::Io at launch) rather than silently feeding empty stdin. Use the reusable sources (from_string / from_bytes / from_iter_lines / from_file) when a chain runs more than once.


Next: Timeouts, retries & cancellation · Running commands · Process groups

Timeouts, retries & cancellation

‹ docs index

Three ways a run ends early, with three different philosophies:

  • a timeout is data — the deadline was part of the run's contract, so its expiry is captured in the result (and only the success-checking verbs turn it into an error);

  • a retry is a policy — the success-checking verbs replay the run while your classifier says the failure is transient;

  • a cancellation is an abandonment — the caller changed its mind, so every path reports an error; there is no result worth inspecting.

  • Timeouts

  • Retries

  • Cancellation

  • Precedence and interactions

Timeouts

Command::timeout(d) kills the whole process tree at the deadline — not just the direct child, so a wrapper script's grandchildren die too.

#![allow(unused)]
fn main() {
use processkit::Command;
use std::time::Duration;

// Captured: inspect the flag yourself.
let result = Command::new("slow-tool")
    .timeout(Duration::from_secs(5))
    .output_string()
    .await?;
if result.timed_out() {
    println!("partial output before the kill: {}", result.stdout());
}

// Raised: the checking verbs convert the flag into a typed error.
let err = Command::new("slow-tool")
    .timeout(Duration::from_secs(5))
    .run()
    .await
    .unwrap_err();
assert!(matches!(err, processkit::Error::Timeout { .. }));
}

Where each verb lands:

VerbDeadline expiry becomes
output_string() / output_bytes()Ok result with timed_out() == true, code() == None, partial output kept
run() / exit_code() / probe() / checked()Error::Timeout { program, timeout, stdout, stderr } — the partial output captured before the kill is attached (err.diagnostic() surfaces a hung tool's last words)
first_line(pred)Error::Timeout (the line never arrived in time)
start() + streamingthe stream ends at the deadline (tree killed, pipes closed); finish then reports the kill (outcome == Outcome::TimedOut)
ensure_success() on a captured resultError::Timeout, checked before the exit code
Pipelinechain deadline → timed_out result; per-stage deadlines fold into pipefail

Two distinct deadline families to keep apart:

  • Command::timeout — the run's own contract, this section.
  • The readiness probes' within parameter — gives Error::NotReady and never kills the child.

Graceful timeout

By default the deadline hard-kills at once. Add timeout_grace(d) to give the tree a chance to clean up: at the deadline it is sent SIGTERM (or the signal chosen with timeout_signal, which needs the process-control feature), allowed up to the grace window to exit, then SIGKILLed — the same SIGTERM → wait → SIGKILL tier as ProcessGroup::shutdown. A signal-handling child that exits ends the grace early.

#![allow(unused)]
fn main() {
use processkit::Command;
use std::time::Duration;

let result = Command::new("slow-tool")
    .timeout(Duration::from_secs(30))
    .timeout_grace(Duration::from_secs(5)) // SIGTERM, wait up to 5s, then SIGKILL
    .output_string()
    .await?;
}

timed_out() is true regardless of whether the child exited on the signal or was SIGKILLed after the grace — the deadline is what fired. Windows has no signal tier: timeout_grace is accepted but the deadline kills the job atomically.

The explicit RunningProcess::shutdown(grace) verb (stop a started handle on demand) composes with a Command::timeout: its own SIGTERM → grace → SIGKILL is the single teardown (it does not also fire the run's timeout teardown), and if the deadline has already elapsed when you call shutdown, the outcome is reported as Outcome::TimedOut — the grace you pass governs the teardown timing.

Retries

retry(max_attempts, backoff, classifier) replays a failed run — up to max_attempts total attempts, sleeping backoff between tries, retrying only while the classifier accepts the error:

#![allow(unused)]
fn main() {
use processkit::{Command, Error};
use std::time::Duration;

let out = Command::new("curl")
    .args(["-fsS", "https://example.com/api"])
    .timeout(Duration::from_secs(10))
    .retry(3, Duration::from_millis(250), |e| {
        // transient: network timeouts and curl's "couldn't connect" (7)
        matches!(e, Error::Timeout { .. })
            || matches!(e, Error::Exit { code: 7, .. })
    })
    .run()
    .await?;
}

Ground rules:

  • Retries apply to the success-checking paths only (run, exit_code, probe, ProcessRunnerExt::checked — and everything built on them, e.g. CliClient). The non-erroring output_string capture never retries: it didn't fail.
  • The classifier sees the typed error — match on variants, codes, even the captured stderr.
  • Each attempt re-runs the same Command: a one-shot stdin source (table) is consumed by attempt #1, so attempt #2 fails loud with an Error::Io (InvalidInput) at launch rather than silently feeding empty stdin. Use reusable sources for retried commands.
  • A Cancelled error is never retried, classifier or not — the token stays cancelled forever, so another attempt could only fail the same way.

For "keep it alive" (restart a service whenever it exits) rather than "replay this one operation", use a Supervisor — same backoff shape, different loop condition.

Cancellation

Hand any command a CancellationToken (re-exported at the crate root); cancelling the token kills the run's tree and makes every consuming path report Error::Cancelled:

use processkit::{CancellationToken, Command};

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let shutdown = CancellationToken::new();

    // Wire the same parent token into many jobs via child tokens:
    let job = tokio::spawn({
        let token = shutdown.child_token();
        async move {
            Command::new("long-export").cancel_on(token).run().await
        }
    });

    // Ctrl-C handler, sibling failure, UI button, …
    shutdown.cancel();

    assert!(matches!(
        job.await.unwrap(),
        Err(processkit::Error::Cancelled { .. })
    ));
    Ok(())
}

The contract, path by path:

SituationBehavior
Cancel during run / output_string / output_bytes / wait / profile / exit_code / probetree killed, Error::Cancelled { program }
Cancel during streaming (stdout_lines)the stream ends; the following finish reports Error::Cancelled
Token already cancelled before the runshort-circuits before spawning — no process is ever created
Cancel on a shared-ProcessGroup handlekills the child itself, leaves the group's siblings alone (same scope as a timeout)
A Pipeline stage's token cancelsthat stage dies; the cancellation errors the whole pipeline and the private group reaps the other stages
Under retryterminal — never retried, whatever the classifier says
Under a Supervisorterminal — supervision returns Err(Cancelled) instead of restarting into a still-cancelled token
wait_any mid-runthe raw primitive doesn't synthesize the error — the race just resolves (a pre-cancelled token still hits the pre-spawn short-circuit)
first_line mid-runsurfaces Error::Cancelled once the token fires — a cancelled stream that closes without a match is reported as cancellation, not Ok(None)

Client-level default

A typed wrapper built on CliClient usually constructs and consumes its Commands internally — there is no place to chain a per-call cancel_on. Set the token once on the client; every command it builds carries it:

#![allow(unused)]
fn main() {
use processkit::{CancellationToken, CliClient};

let token = CancellationToken::new();
let gh = CliClient::new("gh").default_cancel_on(token.child_token());
// ... controller cancels `token` → every in-flight command of THIS client
// dies (whole tree), surfacing Error::Cancelled to the awaiting call.
}

Clients are cheap — scope cancellation by building one client per cancellable scope with its own (child) token, instead of threading tokens through call signatures. cli_client!-generated wrappers re-emit the builder, so Git::new().default_cancel_on(t) works for downstream crates too.

Precedence: a per-command cancel_on chained on a built command replaces the client default (explicit beats default, like a per-command timeout after default_timeout). To honor both sources, wire it explicitly — CancellationToken has no built-in merge: derive a child of the default (let c = default.child_token()), hand the command cancel_on(c.clone()), and have the second source call c.cancel(). Or simpler: build a dedicated client per scope.

Precedence and interactions

Timeout vs. cancellation. A timeout is captured; a cancellation is always an error. When both land on the same run, cancellation wins — you asked the run to stop mattering, so no result is synthesized:

#![allow(unused)]
fn main() {
use processkit::{CancellationToken, Command};
use std::time::Duration;

let token = CancellationToken::new();
token.cancel();

let err = Command::new("tool")
    .timeout(Duration::from_millis(1))   // would have been a Timeout…
    .cancel_on(token)                    // …but cancellation takes priority
    .run()
    .await
    .unwrap_err();
assert!(matches!(err, processkit::Error::Cancelled { .. }));
}

Which knob for which job:

You wantReach for
"This run may not take longer than X"Command::timeout
"This operation is flaky, try a few times"Command::retry
"Stop everything when the app shuts down"cancel_on + one shared token
"Keep this service alive across crashes"Supervisor
"Tell me when it's ready, don't kill it"readiness probes

Next: Supervision · Streaming & interactive I/O · Running commands

Supervision

‹ docs index

Where retry answers "run this once, replaying on failure", a Supervisor answers the different question "keep this alive": restart a child per policy whenever it exits, with bounded restarts, exponential backoff, and jitter — a minimal runit/systemd-style keeper, platform-agnostic because it sits entirely on the ProcessRunner seam.

The shape

use processkit::{Command, RestartPolicy, Supervisor};
use std::time::Duration;

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let outcome = Supervisor::new(Command::new("my-server").args(["--port", "8080"]))
        .restart(RestartPolicy::OnCrash)           // default
        .max_restarts(5)                           // default: unlimited
        .backoff(Duration::from_millis(200), 2.0)  // default: 200ms × 2.0
        .max_backoff(Duration::from_secs(30))      // default: 30s cap
        .jitter(true)                              // default: on
        .stop_when(|res| res.code() == Some(0))    // optional exit condition
        .run()
        .await?;

    println!(
        "ended after {} restarts, reason: {:?}, last exit: {:?}",
        outcome.restarts, outcome.stopped, outcome.final_result.code(),
    );
    Ok(())
}

Each incarnation is one full captured run of the command (so the command's own timeout, stdin, env, … all apply per run — with the usual one-shot-stdin caveat for the second run onward).

Policies: what counts as a crash

A crash is any run that is not a success (ProcessResult::is_success, which honors the command's ok_codes): an exit code outside the accepted set (default {0}), a timeout, a signal-kill, or a spawn failure. A command with ok_codes([0, 2]) that exits 2 is a success, so OnCrash treats it as clean, not a crash.

RestartPolicyRestarts after…
OnCrash (default)crashes only; a clean exit ends supervision (PolicySatisfied)
Alwaysevery completed run, clean or not — pair it with stop_when/max_restarts or it loops forever
Nevernothing: one run, reported as-is

Backoff and jitter

The n-th restart (0-based) sleeps

delay(n) = min(base × factor^n, max_backoff) × jitter

with jitter drawn uniformly from [0.5, 1.5) per restart. Jitter is on by default so a fleet of supervised workers restarted by the same incident doesn't stampede back in lockstep; jitter(false) gives deterministic delays (useful in tests with a paused tokio clock). A non-finite or < 1.0 factor is treated as 1.0 — constant delay, never a shrinking one.

base=200ms, factor=2.0, cap=30s:
restart #0 → ~200ms   #1 → ~400ms   #2 → ~800ms … #7 → ~25.6s   #8+ → 30s (cap)

Failure storms

Backoff spaces individual restarts; max_restarts is a lifetime cap. Neither distinguishes a service that fails once a day from one that is suddenly crash-looping. The opt-in storm guard does (a design borrowed from Go's suture supervisor — the idea, not the code):

#![allow(unused)]
fn main() {
use processkit::{Command, Supervisor};
use std::time::Duration;

let outcome = Supervisor::new(Command::new("worker"))
    .storm_pause(Duration::from_secs(15))     // master switch — off by default
    .failure_decay(Duration::from_secs(30))   // score half-life (default 30s)
    .failure_threshold(5.0)                   // trip point (default 5.0)
    .run()
    .await?;

println!("storm pauses taken: {}", outcome.storm_pauses);
}

Each failed run adds 1 to a score that halves every failure_decay:

score = score × 0.5^(Δt / failure_decay) + 1
  • Fails rarely: the score decays back toward 1 between failures and never reaches the threshold — the guard stays out of the way.
  • Failure storm: failures arrive faster than the half-life drains them, the score climbs past failure_threshold, and the supervisor takes one collective pause of storm_pause (jittered into [0.5, 1.5) like the backoff), resets the score, and resumes.

Only failures feed the score — crashes and spawn errors — not clean exits restarted under RestartPolicy::Always. The pause stacks with (runs before) the per-restart backoff, and the max_restarts budget is checked first, so a storm pause never extends an exhausted budget. Pauses taken are reported in SupervisionOutcome::storm_pauses.

Stopping

Three gates, checked in this order after every completed run:

  1. stop_when(predicate) — sees the run's ProcessResult; returning true ends supervision regardless of policy (→ StopReason::Predicate). "Exit 0 is done, anything else is a crash" is the classic: stop_when(|res| res.code() == Some(0)) under RestartPolicy::Always.
  2. The policyOnCrash stops on a clean exit (→ PolicySatisfied).
  3. max_restarts(n) — at most n restarts = n + 1 total runs; an exhausted budget reports the last result (→ RestartsExhausted). max_restarts(0) means exactly one run.

Outcomes

run() resolves to a SupervisionOutcome:

#![allow(unused)]
fn main() {
let outcome = Supervisor::new(Command::new("job")).run().await?;

outcome.final_result; // ProcessResult<String> of the LAST run
outcome.restarts;     // how many restarts happened (not counting run #1)
outcome.stopped;      // StopReason::{Predicate, PolicySatisfied, RestartsExhausted}
outcome.storm_pauses; // failure-storm pauses taken (0 unless storm_pause is set)
}

Note run() returning Ok does not mean the child succeeded — it means supervision concluded. Inspect final_result (or ensure_success() it) for the child's own verdict.

Supervising inside a shared group

The supervisor runs through any ProcessRunner. The headline production variant injects a ProcessGroup so every incarnation — and everything it spawns — lives in one kill-on-drop container:

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessGroup, RestartPolicy, Supervisor};

let group = ProcessGroup::new()?;

let outcome = Supervisor::new(Command::new("worker"))
    .with_runner(&group)                 // &group is itself a ProcessRunner
    .restart(RestartPolicy::OnCrash)
    .max_restarts(10)
    .run()
    .await?;

// The group outlives supervision: drop it (or shutdown) to reap any strays.
}

Mind one interaction: don't supervise into a group you've suspended — under the cgroup mechanism the restarted child would start frozen (and the spawn itself can block). Resume first.

The same injection point makes supervision logic hermetically testable — script a sequence of fake results and assert the restart/stop behavior with no real process; see Testing your code.

Errors and cancellation

A run that produces no result at all (spawn/IO failure) can't be judged by stop_when; the policy treats it as a crash and restarts (with backoff) unless the policy is Never or the budget is exhausted — then the error itself surfaces as run()'s Err.

A cancelled incarnation is terminal: run() returns Err(Error::Cancelled) immediately. The token never un-cancels, so a restart could only produce another instantly-cancelled run — the supervisor refuses the futile loop.


Next: Testing your code · Timeouts, retries & cancellation · Process groups

Testing your code

‹ docs index

Code that shells out is miserable to test — unless the subprocess is behind a seam. In processkit that seam is one small trait. Only output_string is required; output_bytes (raw-byte stdout) and start (a live handle for streaming/probes) are defaulted, so a minimal double implements just output_string:

#[async_trait]
pub trait ProcessRunner: Send + Sync {
    async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>>;
    // Defaulted (route through `start`); override for byte/streaming support:
    async fn output_bytes(&self, command: &Command) -> Result<ProcessResult<Vec<u8>>>;
    async fn start(&self, command: &Command) -> Result<RunningProcess>;
}

Production code takes a runner (generically or as &dyn ProcessRunner); tests hand it a double. Four doubles ship with the crate, plus a macro that makes whole CLI wrappers testable for free.

The ProcessRunner seam

JobRunner is the real implementation (each run in a fresh private group); a ProcessGroup is also a runner (runs land in that shared group); and impl ProcessRunner for &R means a borrowed runner works wherever an owned one does — inject &group or &recording without giving ownership away.

Every runner — real or double — gets the convenience helpers of ProcessRunnerExt for free: run (trimmed stdout, success required), run_unit, exit_code, probe (exit code as a boolean), checked (success-checked full result), and parse/try_parse (feed stdout to a closure; like first_line, generic over the closure so unavailable on a &dyn ProcessRunner). Retry policies work through the seam too, so a double exercises your retry handling hermetically.

The seam covers streaming as well as bulk runs: ProcessRunner::start returns a live RunningProcess, and a ScriptedRunner's start hands back a scripted handle whose canned lines flow through the same pump machinery a real child uses — stdout_lines, wait_for_line, and finish behave identically, with no subprocess (see Scripted streaming below). An output_string-only custom runner keeps compiling: start is defaulted to Error::Unsupported.

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessRunner, ProcessRunnerExt, Result};

// Production code: generic over the runner.
async fn current_branch(runner: &impl ProcessRunner) -> Result<String> {
    runner
        .run(&Command::new("git").args(["branch", "--show-current"]))
        .await
}
}

Scripting replies

ScriptedRunner returns canned Replys for matched commands — the work-horse double:

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessRunnerExt};
use processkit::testing::{Reply, ScriptedRunner};

#[tokio::test]
async fn detects_the_branch() {
    let runner = ScriptedRunner::new()
        // Match by program + argument PREFIX (element-wise; first element is
        // the program name, in registration order):
        .on(["git", "branch", "--show-current"], Reply::ok("main\n"))
        // …or by any predicate over the full Command:
        .when(
            |cmd| cmd.working_dir().is_some(),
            Reply::fail(128, "fatal: not a git repository"),
        )
        // …with an optional catch-all:
        .fallback(Reply::ok(""));

    assert_eq!(current_branch(&runner).await.unwrap(), "main");
}
}

The pieces:

  • Reply::ok(stdout) — exit 0. Reply::fail(code, stderr) — non-zero with stderr. Reply::lines(["a", "b"]) — exit 0 with the lines joined (and streamed one by one on a scripted start). Reply::timeout() — a timed-out run (the checking helpers raise Error::Timeout from it, carrying the command's own configured deadline). On a scripted start it resolves immediately as timed-out; to exercise a real deadline race, use Reply::pending() + a Command::timeout. .with_stdout(text) — attach stdout to any of them (e.g. the CONFLICT … text git prints on a failing merge). .with_line_delay(d) — pace a scripted stream's lines.
  • Reply::pending() — parks the call until the command's cancellation token (per-command cancel_on or the client-level default_cancel_on) fires, then resolves with Error::Cancelled — so a test can prove an orchestration actually cancels a blocked call, not just that it formats a canned error. With no token it parks forever, like a hung child.
  • Rules are tried in registration order; first match wins. Prefix matching is element-wise over the program name then the arguments (the first element is the program) — on(["git", "foo"]) matches git foo bar but not git foobar (and not rm foo). Use on_sequence to serve an ordered sequence of replies (each once, then the last repeats) for a fail-then-succeed scenario.
  • No match and no fallback is a loud error (Error::Spawn, not-found) — an unexpected invocation can't slip through a test silently.
  • Bulk runs also replay the canned lines through the command's on_stdout_line/on_stderr_line handlers, so a wrapper's progress-reporting path is exercised without a subprocess.

Scripted streaming

ScriptedRunner::start returns a live RunningProcess backed by the canned reply instead of an OS child. The canned stdout/stderr feed the same pump machinery a real child uses, so the whole streaming surface works hermetically — stdout_lines yields the lines, wait_for_line probes them, finish reports the canned outcome and stderr:

#![allow(unused)]
fn main() {
use processkit::{Command, Outcome, ProcessRunner, StreamExt, Finished};
use processkit::testing::{Reply, ScriptedRunner};
use std::time::Duration;

#[tokio::test]
async fn server_becomes_ready() {
    let runner = ScriptedRunner::new()
        .on(["server", "serve"], Reply::lines(["booting", "listening on 8080"]));

    let mut run = runner.start(&Command::new("server").arg("serve")).await.unwrap();
    run.wait_for_line(|l| l.contains("listening"), Duration::from_secs(5))
        .await
        .unwrap(); // satisfied by the canned banner — no subprocess

    let Finished { outcome, .. } = run.finish().await.unwrap();
    assert_eq!(outcome, Outcome::Exited(0));
}
}

Reply::lines([...]) scripts the stdout lines; .with_line_delay(d) paces them (deterministic under #[tokio::test(start_paused = true)]), and the scripted run "exits" after the last line. The honest boundaries: a scripted handle has no OS identity (pid() is None, profile reports empty samples), does not compose into a real Pipeline, and does not model interactive stdin. Reply::pending() scripts a run that never exits on its own — cancel or time it out through the command's own knobs. A command timeout does bound a scripted stream (it ends at the deadline and reports Outcome::TimedOut, like a real child), but a scripted handle has no signal tier, so — like on Windows — it ignores timeout_grace and ends at once.

Asserting invocations

RecordingRunner wraps another runner and records every Invocation — what was asked — so a test asserts inputs, not just outputs:

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessRunnerExt};
use processkit::testing::{RecordingRunner, Reply, ScriptedRunner};

#[tokio::test]
async fn passes_the_right_flags() {
    let runner = RecordingRunner::new(
        ScriptedRunner::new().fallback(Reply::ok("done")),
    );

    runner
        .run(&Command::new("gh").args(["pr", "create", "--draft"]).current_dir("/repo"))
        .await
        .unwrap();

    let call = runner.only_call(); // panics unless exactly one call
    assert_eq!(call.args_str(), ["pr", "create", "--draft"]);
    assert!(call.has_flag("--draft"));
    assert_eq!(call.cwd.as_deref().map(|c| c.to_str().unwrap()), Some("/repo"));
    assert!(!call.has_stdin);
}
}

An Invocation captures the routing knobs — program, args, cwd, envs (explicit overrides, None = removal), has_stdin — not the I/O-shaping ones (timeout, encodings, buffer policy); assert those through a when predicate over the Command itself. calls() returns the full list when more than one run is expected.

Expectation-style: MockRunner

With the mock feature, mockall generates a MockRunner for expectation-style tests (call counts, argument matchers, ordered expectations) — the right tool when the interaction is the contract.

Note: MockRunner's expect_* surface is generated by mockall and is exempt from this crate's semver guarantees — it tracks the mockall dependency, not a frozen API. For a stable double, prefer ScriptedRunner (canned replies) or RecordingRunner (input assertions) above.

use processkit::testing::MockRunner;

let mut mock = MockRunner::new();
mock.expect_output_string()
    .times(1)
    .returning(|_cmd| /* build a Result<ProcessResult<String>> */ …);

MockRunner does not inherit the defaults. Unlike a hand-written runner (where output_bytes/start are defaulted), mockall::automock replaces every method with an expectation — so a verb that routes through start or output_bytes needs its own expect_start() / expect_output_bytes(), or the unset call panics ("no expectation"). ScriptedRunner provides the defaults and the streaming seam out of the box.

For most tests ScriptedRunner/RecordingRunner read better; reach for the mock when you need mockall's matching machinery.

Record/replay cassettes

With the record feature, RecordReplayRunner closes the loop: record real runs to a JSON cassette once, then replay them deterministically — fast, hermetic, byte-stable, no subprocess in CI:

#![allow(unused)]
fn main() {
use processkit::{Command, JobRunner, ProcessRunnerExt};
use processkit::testing::RecordReplayRunner;

// Record once against the real tool (an opt-in `--record` test run, say):
let runner = RecordReplayRunner::record("fixtures/git.json", JobRunner::new());
let version = runner.run(&Command::new("git").arg("--version")).await?;
runner.save()?;                                  // the error-surfacing flush
                                                 // (best-effort on drop too)

// Replay everywhere else:
let runner = RecordReplayRunner::replay("fixtures/git.json")?;
assert_eq!(runner.run(&Command::new("git").arg("--version")).await?, version);
}

Semantics worth knowing before you commit a cassette:

AspectBehavior
Match keyprogram + args + cwd + a stdin source digest (hashed, never persisted: in-memory bytes hash their content, a from_file source hashes its path) — no stdin (absent or Stdin::empty()) keys distinctly; lossy UTF-8 on the text parts
Environmentvalues never reach the file — only sorted variable names, so env secrets can't leak through a committed fixture; env is not matched, so env differences can't cause spurious misses
Duplicates of one keyreplay in capture order, then the last entry repeats — a recorded sequence (git rev-parse HEAD before/after a commit) replays faithfully, while retry/probe loops keep getting a stable final answer
Missstrict Error::CassetteMiss (distinct from a missing program — is_not_found() is false) — replay never spawns a surprise subprocess; a stale cassette fails loudly
Timeoutsa recorded timed-out run replays as one, surfacing Error::Timeout with the replaying command's deadline
Formatpretty-printed JSON with a version field; unknown versions / corrupt files / an entry with a contradictory outcome / a file over 64 MiB are Error::Io(InvalidData), a missing file keeps NotFound
Err resultsnot recorded — only completed runs (non-zero exits and captured timeouts are results and are recorded)

Only env values are redacted. program, args, cwd, stdout, and stderr are stored verbatim and can carry secrets (a --password=… flag, a token echoed to output), so review a fixture before committing it. On Unix the file is written 0600 and the write refuses to follow a symlink at the cassette path (O_NOFOLLOW, so a planted link can't redirect the secret-bearing write — it fails loud instead). On Windows the file inherits the containing directory's ACL, so restrict that directory (or use a per-user temp dir, not a world-writable shared one) for secret-bearing fixtures.

A neat trick: in tests, record against a ScriptedRunner instead of JobRunner — the whole record→save→replay round trip is then itself hermetic.

Wrapping a CLI tool

CliClient is the foundation for typed wrappers around external tools (git, jj, gh, kubectl, …): it owns the program name, per-client defaults, and the runner; your wrapper contributes only commands and parsers. The cli_client! macro generates the boilerplate:

#![allow(unused)]
fn main() {
use processkit::{cli_client, Error, ProcessRunner, Result};
use std::path::Path;
use std::time::Duration;

cli_client!(
    /// A typed `git` client.
    pub struct Git => "git"
);

impl<R: ProcessRunner> Git<R> {
    /// HEAD's commit id.
    pub async fn head(&self, repo: &Path) -> Result<String> {
        self.core.run(self.core.command_in(repo, ["rev-parse", "HEAD"])).await
    }

    /// Is the work tree clean? (exit code IS the answer)
    pub async fn is_clean(&self, repo: &Path) -> Result<bool> {
        self.core.probe(self.core.command_in(repo, ["diff", "--quiet"])).await
    }

    /// Branch list, parsed — the parser is fallible and returns the crate's
    /// `Result`, typically an `Error::Parse` naming the program.
    pub async fn branches(&self, repo: &Path) -> Result<Vec<String>> {
        self.core
            .try_parse(
                self.core.command_in(repo, ["branch", "--format=%(refname:short)"]),
                |out| {
                    let list: Vec<String> = out.lines().map(str::to_owned).collect();
                    if list.is_empty() {
                        Err(Error::Parse {
                            program: "git".into(),
                            message: "no branches".into(),
                        })
                    } else {
                        Ok(list)
                    }
                },
            )
            .await
    }
}

// Production: the real runner, with per-client defaults.
let git = Git::new().default_timeout(Duration::from_secs(30));
let head = git.head(Path::new(".")).await?;
}

The generated type is Git<R: ProcessRunner = JobRunner> with Git::new(), Git::with_runner(runner), default_timeout / default_env / default_env_remove builders, and a public core: CliClient<R> whose helpers speak the crate-wide verb vocabulary: run (trimmed stdout), output_string (full result), run_unit (success only), exit_code, probe, plus parse (infallible) and try_parse (fallible → Error::Parse).

And the payoff — the wrapper tests hermetically with any double:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn head_is_trimmed() {
    let git = Git::with_runner(
        ScriptedRunner::new().on(["git", "rev-parse", "HEAD"], Reply::ok("abc123\n")),
    );
    assert_eq!(git.head(Path::new("/repo")).await.unwrap(), "abc123");
}
}

…or with a cassette recorded against the real tool once.


Next: Platform support · Supervision · Running commands

Platform support

‹ docs index

processkit supports Unix and Windows only — it requires tokio::process and OS job / process-group primitives that have no equivalent on bare targets like wasm. Building for such a target fails at compile time (a compile_error! guard, or earlier in tokio's own dependencies). Within the supported set, it treats platform support as first-class: every capability is either fully implemented, honestly partial (documented and typed), or refused with Error::Unsupported — never silently skipped. This page collects all the matrices and fine print in one place.

Containment mechanisms

ProcessGroup::mechanism() reports which one you actually got:

MechanismPlatformHow containment works
JobObjectWindowsA Job Object with kill-on-close; children are created suspended, assigned to the job, then resumed — so even a grandchild forked in the first instant is contained
CgroupV2Linux (with delegation)A private cgroup; children join in pre_exec, before exec, so descendants can never escape; teardown is cgroup.kill
ProcessGroupmacOS, BSDs, Linux fallbackPOSIX process groups (setpgid); teardown is killpg; tracked per started/adopted child

On Linux the cgroup backend requires controller delegation, and resource limits specifically need this process to run at the real cgroup-v2 root. The crate creates the limit cgroup under this process's own cgroup and enables the controllers in that cgroup's subtree_control, which cgroup v2's "no internal processes" rule allows only for the real hierarchy root (the one exempt cgroup). A cgroup namespace root does not qualify — it only virtualizes the view — so an ordinary (private-cgroupns) container fails EBUSY just like a systemd session/scope/service. The crate does not migrate your process into a sub-cgroup to work around it, so in practice limits apply only at a minimal non-systemd init sitting at the real root. Without a usable cgroup it quietly falls back to ProcessGroup — unless you requested resource limits, which fail fast instead (Error::ResourceLimit), because an unapplied cap is no protection.

Capability matrices

Teardown & containment

CapabilityWindows JobObjectLinux cgroupLinux pgroupmacOS/BSD
Kill-on-drop, whole tree✅ groups-based✅ groups-based
Graceful shutdown (TERM → grace → KILL)🟡 atomic kill only
adopt an external child✅ (future forks contained)✅ (future forks contained)🟡 exec'd child tracked individually🟡 same

Signals & freezing

CapabilityWindowsLinux cgroupLinux pgroupmacOS/BSD
Arbitrary signal (Hup, Usr1, Other(n), …)Kill only
suspend / resume🟡 per-thread countscgroup.freezeSIGSTOP/CONTSIGSTOP/CONT

On the cgroup mechanism, a non-Kill signal (and the SIGSTOP/SIGCONT fallback used for suspend/resume on pre-5.2 kernels without cgroup.freeze) surfaces a real per-member delivery failure (e.g. EPERM) as an Err rather than swallowing it — consistent with the "never silently skipped" philosophy; an ESRCH race (the member already exited) is still success.

Inspection & accounting (stats feature)

CapabilityWindowsLinux cgroupLinux pgroupmacOS/BSD
members()✅ whole tree✅ whole tree🟡 leaders only🟡 leaders only
Group CPU / peak memory❌ count only❌ count only
Per-run cpu_time / peak_memory_bytes / profile✅ (/proc)None

Resource limits (limits feature)

CapabilityWindowsLinux cgroupLinux pgroupmacOS/BSD
memory_max (whole tree)
max_processes
cpu_quota🟡 approximate

Spawn-time controls

CapabilityWindowsUnix (all)
inherit_env allow-list
uid / gid dropUnsupported
setsidUnsupported
create_no_windowno-op
kill_on_parent_death✅ always on (kernel)Linux: direct child; macOS/BSD: no-op

Everything not listed — capture, streaming, interactive stdin, encodings, buffer policies, timeouts, retry, pipelines, supervision, readiness probes, the test doubles, cassettes, cancellation — is platform-agnostic and behaves identically everywhere.

Caveats

The honest fine print, mostly consequences of OS semantics:

Windows: termination is an exit code, never Signalled (D18). Windows has no signal abstraction, so a killed process reports Outcome::Exited, not Outcome::Signalled. TerminateProcess / TerminateJobObject(_, 1) is Exited(1) — indistinguishable from a voluntary exit(1) — and Ctrl-C surfaces as Exited(-1073741510) (STATUS_CONTROL_C_EXIT as a signed i32). The crate reports the platform truth rather than fabricating a Signalled from an NTSTATUS code (that mapping would be a lossy guess). When you need to know the run was killed, use a ProcessGroup deadline or a cancellation token (which surface as TimedOut / Error::Cancelled on every platform). Outcome::Signalled is therefore Unix-only.

Linux cgroup delegation. Creating the per-group cgroup needs write access to the cgroup v2 hierarchy. Dev boxes typically lack it → the pgroup fallback. CI inside containers usually has it. Check mechanism() when behavior must not silently degrade.

uid()/gid() × the cgroup mechanism. The OS applies the uid drop before pre_exec hooks, and the cgroup join runs in pre_exec — as the already-dropped user, who can't write the root-owned cgroup.procs. The spawn fails with a permission error (never an uncontained child). Privilege drop composes cleanly with the process-group mechanism.

setsid() × process groups. A new session implies a new process group; the crate coordinates the two (the containment tracking follows the new session's group), so setsid keeps the kill-on-drop guarantee instead of breaking out of it.

kill_on_parent_death() is thread-scoped on Linux. PR_SET_PDEATHSIG fires when the spawning thread dies, not only the process. On a multi-threaded tokio runtime a retired worker thread could kill the child early; spawn from a current-thread runtime for the strongest guarantee. It covers the direct child only — with the parent SIGKILLed, nothing tears the cgroup/pgroup down, so grandchildren survive. The parent-died-before-arming race is closed by re-checking getppid() in the child against the spawner's pid captured before the fork — which stays correct when the spawner itself is PID 1 (a container entrypoint).

Windows: the suspended-spawn handshake. Children are created CREATE_SUSPENDED, assigned to the job, then resumed — closing the classic race where a fast child forks before it's in the job. A consequence: a raw ProcessGroup::spawn caller passing its own creation flags gets them OR'd with CREATE_SUSPENDED (the Command-driven paths handle this for you, incl. create_no_window).

Windows: nested suspends. SuspendThread keeps per-thread counts — two suspend() calls need two resume()s. The POSIX backends are level-triggered (idempotent). Suspension is also best-effort against a tree that is spawning threads mid-walk.

Spawning into a suspended cgroup group. The freeze is group state: a child spawned or adopted while suspended joins frozen — the forked child joins the cgroup before exec, so it can freeze before completing the spawn handshake and start() may never return until resume. Resume before starting new work; details in Process groups.

Frozen trees and graceful shutdown. Hard kills penetrate a frozen tree (SIGKILL / cgroup.kill / job terminate), but a graceful shutdown leads with a SIGTERM the frozen processes can't handle — it waits out the full grace. Resume first.

pgroup backends: leaders, zombies, pid reuse. members() lists tracked group leaders only; an exited-but-unreaped child (zombie) still probes as alive (keep wait()ing handles if you need prompt liveness, e.g. for shutdown's early return); and pid-based signalling is inherently best-effort against pid reuse — the crate prunes dead entries on every probe to keep the window minimal.


Next: Process groups · Running commands · ‹ docs index

What's next

‹ docs index

ProcessKit is a Rust library today, published as processkit on crates.io. The plan is to bring the same approach — kernel-backed whole-tree containment, honest error semantics, and testable seams — to other ecosystems: a Go package, an F# library, a Kotlin library, and a Python wrapper. Each implementation will follow the same philosophy and be documented here as it ships.