Cookbook

‹ docs index

Task-oriented recipes: find the thing you're trying to do, copy the snippet, follow the link when you need the fine print. Every snippet assumes a tokio runtime and use processkit::Command; unless shown otherwise.

Run a command and get its output

#![allow(unused)]
fn main() {
let head = Command::new("git").args(["rev-parse", "HEAD"]).run().await?;
}

run() requires a zero exit and returns stdout with trailing whitespace trimmed; a non-zero exit, spawn failure, or timeout is a typed Error. For a one-liner without the builder: processkit::run("git", ["rev-parse", "HEAD"]).

Fine print: Running commands → consuming verbs.

Inspect a failure instead of erroring

#![allow(unused)]
fn main() {
let result = Command::new("git").args(["merge", "topic"]).output_string().await?;
if !result.is_success() {
    eprintln!("merge exited {:?}: {}", result.code(), result.stderr());
}
}

output_string() (and output_bytes() for raw bytes) treats the exit code as data — Err means the run couldn't happen at all. Call result.ensure_success()? later to convert a stored failure into the same typed error run() would have produced.

Fine print: Running commands → results and errors.

Ask a yes/no question

#![allow(unused)]
fn main() {
let dirty = !Command::new("git").args(["diff", "--quiet"]).probe().await?;
}

probe() maps exit 0 → true, exit 1 → false, and anything else to an error — the git diff --quiet / grep -q convention without manual code matching.

Accept non-zero exit codes as success

#![allow(unused)]
fn main() {
// `grep` exits 1 when it finds no match — not a failure for this call.
let found = Command::new("grep")
    .args(["needle", "haystack.txt"])
    .ok_codes([0, 1])
    .output_string()
    .await?;
let matched = found.code() == Some(0); // 0 = matched, 1 = no match (both "success")
}

ok_codes widens what the checking verbs (run/run_unit) and is_success/ensure_success treat as success — for tools whose non-zero exit is a normal result (grep 1 = no match, diff 1 = differs, rsync's code families). It does not change exit_code (always the raw code) or probe (always the 0/1 convention). An empty set is ignored, so the default stays exit 0.

Bound a run with a timeout

#![allow(unused)]
fn main() {
use std::time::Duration;

let result = Command::new("slow-tool")
    .timeout(Duration::from_secs(30))
    .output_string()
    .await?;
if result.timed_out() {
    eprintln!("gave up after 30s; partial output: {}", result.stdout());
}
}

At the deadline the whole tree is killed. On the capture verbs the timeout is captured (timed_out(), partial output kept); on the success-checking verbs (run, exit_code) it surfaces as Error::Timeout.

Let a tool clean up on timeout

#![allow(unused)]
fn main() {
use std::time::Duration;

let result = Command::new("dev-server")
    .timeout(Duration::from_secs(30))
    .timeout_grace(Duration::from_secs(5)) // SIGTERM, wait up to 5s, then SIGKILL
    .output_string()
    .await?;
}

timeout_grace turns the hard deadline kill into a graceful one: SIGTERM (or the signal from timeout_signal, with the process-control feature), up to the grace window to exit, then SIGKILL. A signal-handling child exits early; timed_out() stays true. Windows has no signal tier — the deadline kills atomically.

Fine print: Timeouts → graceful timeout.

Show a useful error message

#![allow(unused)]
fn main() {
if let Err(e) = Command::new("git").args(["merge", "topic"]).run().await {
    eprintln!("merge failed: {}", e.diagnostic().unwrap_or("(no output)"));
}
}

Error::diagnostic() picks the most explanatory captured text — stderr, falling back to stdout (git writes CONFLICT … there) — so callers don't re-implement the same heuristic.

Feed the child's stdin

#![allow(unused)]
fn main() {
use processkit::Stdin;

// A string you already have:
let sorted = Command::new("sort")
    .stdin(Stdin::from_string("banana\napple\n"))
    .run()
    .await?;

// …or any async source: a reader (file, socket) or a stream of lines.
let from_file = Stdin::from_reader(tokio::fs::File::open("input.txt").await?);
let from_chan = Stdin::from_lines(tokio_stream::iter(vec!["one".to_owned()]));
}

One-shot sources (from_reader/from_lines) feed a single run; re-running the same Command afterwards fails loud (an Error::Io at launch, D10) instead of silently seeing empty stdin. For a conversation, see the next recipe but one.

Fine print: Running commands → standard input.

Stream output as it arrives

#![allow(unused)]
fn main() {
use processkit::{StreamExt, Finished}; // StreamExt re-exported; provides `.next()`

let mut run = Command::new("cargo").args(["build", "--verbose"]).start().await?;
let mut lines = run.stdout_lines()?;
while let Some(line) = lines.next().await {
    println!("build: {line}");
}
let Finished { outcome, stderr, .. } = run.finish().await?; // outcome + buffered stderr
}

No waiting for exit, no full-output buffering; stderr is drained in the background so the child can't block. A timeout on the command bounds the stream itself. Prefer a callback? .on_stdout_line(|l| …) runs one per line while any capture verb drives the run.

Fine print: Streaming & interactive I/O.

Talk to an interactive child

#![allow(unused)]
fn main() {
use processkit::StreamExt;

let mut run = Command::new("bc").keep_stdin_open().start().await?;
let mut stdin = run.take_stdin().expect("stdin was kept open");
stdin.write_line("2 + 2").await?;
stdin.finish().await?; // EOF — bc exits

let mut answers = run.stdout_lines()?;
while let Some(answer) = answers.next().await {
    println!("{answer}");
}
}

keep_stdin_open() hands you an async writer instead of closing stdin at spawn; interleave writes with reads for request/response tools. Its writer methods return std::io::Result (idiomatic for a writer) — convert with .map_err(processkit::Error::Io)? in a processkit::Result function, or use Box<dyn std::error::Error>.

Fine print: Streaming & interactive I/O → interactive stdin.

Pipe commands without a shell

#![allow(unused)]
fn main() {
let authors = Command::new("git").args(["log", "--format=%an"])
    .pipe(Command::new("sort"))
    .pipe(Command::new("uniq").arg("-c"))
    .output_string()
    .await?;
}

Native pipes — no shell string, no quoting, no injection surface. The outcome is pipefail: stdout comes from the last stage, the reported failure from the first stage that didn't exit cleanly. All stages share one kill-on-drop group. The | operator is equivalent sugar: (a | b | c).output_string().

For a consumer that legitimately stops reading early — the | head -1 shape, where the producer's broken-pipe death (its next write fails once the downstream closes, or SIGPIPE where the OS delivers it) is expected — mark the producer unchecked_in_pipe() so that death doesn't fail the chain:

#![allow(unused)]
fn main() {
let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe()
    | Command::new("head").args(["-n", "1"]))
    .run()
    .await?;
}

Fine print: Pipelines → unchecked stages.

Start a server and wait until it's ready

#![allow(unused)]
fn main() {
use std::time::Duration;

let mut server = Command::new("my-server").args(["--port", "8080"]).start().await?;

// Pick the probe that matches how the server announces readiness:
server.wait_for_line(|l| l.contains("listening"), Duration::from_secs(10)).await?;
// server.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10)).await?;
// server.wait_for(|| async { http_health().await }, Duration::from_secs(10)).await?;

// …use the server; dropping `server` kills its whole tree.
}

A probe that can't succeed fails fast with Error::NotReady and never kills the child — you decide what happens next. No more sleep(2) and hoping.

Fine print: Streaming & interactive I/O → readiness probes.

Tear down several children as a unit

#![allow(unused)]
fn main() {
use processkit::ProcessGroup;

let group = ProcessGroup::new()?;
let _db = group.start(&Command::new("dev-db")).await?;
let _api = group.start(&Command::new("dev-api")).await?;

// Either: graceful — SIGTERM, bounded wait, optional SIGKILL escalation…
group.shutdown().await?;
// …or just drop(group): hard kill-on-drop of everything, grandchildren included.
}

The group is the unit of fate: a panic or early return anywhere reaps every member. Configure the grace window via ProcessGroupOptions.

Fine print: Process groups.

React to whichever child exits first

#![allow(unused)]
fn main() {
use processkit::{ProcessGroup, wait_any};

let group = ProcessGroup::new()?;
let mut a = group.start(&Command::new("worker-a")).await?;
let mut b = group.start(&Command::new("worker-b")).await?;

let (idx, outcome) = wait_any(&mut [&mut a, &mut b]).await?;
println!("worker #{idx} exited first with {outcome:?}");
// `a` and `b` are only borrowed — the loser is still usable here.
}

Fine print: Streaming & interactive I/O → racing children.

Sandbox an untrusted tool

#![allow(unused)]
fn main() {
use processkit::{ProcessGroup, ProcessGroupOptions};

// Cap the whole tree (requires the `limits` feature; Windows Job / Linux cgroup):
let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .memory_max(512 * 1024 * 1024)
        .max_processes(64)
        .cpu_quota(0.5),
)?;

let result = group
    .start(
        &Command::new("untrusted-tool")
            .inherit_env(["PATH"]) // allow-list: everything else is cleared
            .timeout(std::time::Duration::from_secs(60)),
    )
    .await?
    .output_string()
    .await?;
}

Unenforceable limits are a hard Error::ResourceLimit, never a silently unbounded group. On Unix, add .uid(…)/.gid(…) to drop privileges (note the cgroup-mechanism caveat in the guide).

Fine print: Process groups → resource limits · Running commands → privileges.

Keep a crash-prone service running

#![allow(unused)]
fn main() {
use processkit::{RestartPolicy, Supervisor};
use std::time::Duration;

let outcome = Supervisor::new(Command::new("my-service"))
    .restart(RestartPolicy::OnCrash)
    .max_restarts(5)
    .backoff(Duration::from_millis(200), 2.0)
    .storm_pause(Duration::from_secs(15)) // crash-loop guard (off by default)
    .run()
    .await?;
println!(
    "stopped after {} restarts ({} storm pauses): {:?}",
    outcome.restarts, outcome.storm_pauses, outcome.stopped
);
}

Exponential backoff with jitter by default; stop_when(…) ends supervision on a condition; .with_runner(&group) keeps every incarnation inside one shared kill-on-drop group. storm_pause arms the failure-storm guard: failures feed a decaying score, and past the threshold the supervisor takes one collective pause instead of hammering restarts — "fails rarely" and "crash-looping" stop being the same case.

Fine print: Supervision, failure storms.

Retry a flaky command

#![allow(unused)]
fn main() {
use processkit::Error;
use std::time::Duration;

let fetched = Command::new("git")
    .args(["fetch", "--quiet"])
    .timeout(Duration::from_secs(10))
    .retry(3, Duration::from_millis(200), |e| {
        matches!(e, Error::Timeout { .. })
            || e.diagnostic().is_some_and(|m| m.contains("Could not resolve host"))
    })
    .run()
    .await?;
}

The classifier sees the typed error and decides whether this failure is worth another attempt; each attempt is a fresh process. retry replays a run to success — for keeping a process alive, use a Supervisor (previous recipe).

Fine print: Timeouts, retries & cancellation → retry.

Cancel runs on shutdown

#![allow(unused)]
fn main() {
use processkit::CancellationToken;

let token = CancellationToken::new();

let job = tokio::spawn({
    let token = token.child_token();
    async move { Command::new("long-job").cancel_on(token).run().await }
});

// On Ctrl-C / shutdown signal / sibling failure:
token.cancel(); // kills the tree; the run resolves to Error::Cancelled
let outcome = job.await; // Err(Error::Cancelled { .. }) inside
}

Cancellation is always an error (the run was abandoned, there is no result), beats a simultaneous timeout, and is terminal for retry and Supervisor alike.

For a typed wrapper whose commands never cross your code, set the token once on the client — every command it builds carries it:

#![allow(unused)]
fn main() {
use processkit::{CancellationToken, CliClient};

let token = CancellationToken::new();
let gh = CliClient::new("gh").default_cancel_on(token.child_token());
// token.cancel() → every in-flight command of THIS client dies.
}

Fine print: Timeouts, retries & cancellation → cancellation, client-level default.

Measure what a run cost

#![allow(unused)]
fn main() {
use std::time::Duration;

// One run, summarized (requires the opt-in `stats` feature):
let profile = Command::new("crunch").start().await?.profile(Duration::from_millis(100)).await?;
println!("exit={:?} took={:?} peak_rss={:?} avg_cpu={:?}",
    profile.exit_code, profile.duration, profile.peak_memory_bytes, profile.avg_cpu());
}

For a live series over a whole group, group.sample_stats(every) yields a Stream of snapshots. CPU/memory need a real container (Windows Job / Linux cgroup); elsewhere you still get process counts.

Fine print: Process groups → stats · Streaming → profiling.

Contain a process you didn't spawn

#![allow(unused)]
fn main() {
use processkit::ProcessGroup;

let child = tokio::process::Command::new("legacy-launcher").spawn()?;

let group = ProcessGroup::new()?; // `adopt` is part of `process-control` (default-on)
group.adopt(&child)?;            // from now on the group's teardown covers it
}

Adoption is best-effort by mechanism — on Windows/cgroup the whole running tree joins; on the POSIX process-group backends an exec'd child is contained individually (its future forks too, where it could be re-grouped). The guide spells out exactly what each mechanism can promise.

Fine print: Process groups → adopt · Platform support.

Test code that runs processes — without processes

#![allow(unused)]
fn main() {
use processkit::testing::{Reply, ScriptedRunner};

// Your code takes any `R: ProcessRunner`; in tests, hand it a script.
// Rules match on a prefix of the *program name followed by its arguments*
// (the first element is the program):
let runner = ScriptedRunner::new()
    .on(["git", "rev-parse"], Reply::ok("abc123\n"))
    .on(["git", "push"], Reply::fail(128, "remote: permission denied"))
    .fallback(Reply::ok(""));

// my_deploy(&runner).await? — no subprocess, fully deterministic.
}

RecordingRunner wraps any runner and captures every Invocation for assertions; MockRunner (feature mock) gives mockall expectations; and the record feature's RecordReplayRunner records real runs into a JSON cassette once and replays them hermetically in CI.

Fine print: Testing your code.

Test streaming code — without processes

#![allow(unused)]
fn main() {
use processkit::{Command, Outcome, Finished};
use processkit::testing::{Reply, ScriptedRunner};
use std::time::Duration;

let runner = ScriptedRunner::new()
    .on(["gh", "run", "watch"], Reply::lines(["queued", "in_progress", "completed"])
        .with_line_delay(Duration::from_millis(50))); // paced delivery

let mut run = runner.start(&Command::new("gh").args(["run", "watch", "123"])).await?;
run.wait_for_line(|l| l.contains("completed"), Duration::from_secs(5)).await?;
let Finished { outcome, .. } = run.finish().await?;
assert_eq!(outcome, Outcome::Exited(0));
}

A scripted start() feeds the canned lines through the same pump machinery a real child uses, so stdout_lines, the readiness probes, and finish behave identically — and with_line_delay is deterministic under #[tokio::test(start_paused = true)]. Canned output also replays through on_stdout_line/on_stderr_line handlers on the bulk verbs, so progress-reporting paths test hermetically too.

Fine print: Testing → scripted streaming.

Wrap a CLI tool behind a typed API

#![allow(unused)]
fn main() {
use processkit::{cli_client, ProcessRunner, Result};

cli_client!(pub struct Git => "git");

impl<R: ProcessRunner> Git<R> {
    pub async fn current_branch(&self) -> Result<String> {
        // A verb takes the args directly; pass a built `command(..)` only
        // when you need to customize it (per-call timeout, stdin, …).
        self.core.run(["branch", "--show-current"]).await
    }
    pub async fn is_clean(&self) -> Result<bool> {
        self.core.probe(["diff", "--quiet"]).await
    }
}
}

The generated struct carries a runner and per-client defaults (default_timeout, default_env); your methods are just argument lists and parsers — and because the runner is injectable, the whole wrapper is testable with the previous recipe's ScriptedRunner.

Fine print: Testing your code → CliClient.