ProcessKit

crates.io docs.rs GitHub

ProcessKit is an async child-process management library for Rust (tokio). It solves the orphan-process problem at the kernel level and packages a full set of tools around it: streaming I/O, shell-free pipelines, supervision, and hermetic testing seams.

Running external programs is part of everyday software: compiling code, querying version control, invoking CLI tools, managing background servers. Every major runtime makes it easy to start a child process. What they don't make easy is cleaning up after one.

The orphan problem

When a build tool spawns compiler workers, when an integration test starts a local database, when a wrapper script calls the real binary — those grandchildren exist outside your program's awareness. If your code panics, times out, or drops a future mid-flight, the direct child may receive a signal. But everything deeper in the tree keeps running as orphans: ports stay bound, temp files stay open, CPU keeps spinning. The next test run tries to bind the same port and fails with "address already in use."

This is not an edge case. It is the default behavior of every process-spawning API that works at the level of a single pid — including std::process and tokio::process.

Whole-tree containment

ProcessKit solves this at the kernel level. Every process you start lives inside an operating-system containment object: a Job Object on Windows, a cgroup v2 on Linux (with a POSIX process-group fallback on hosts without cgroup delegation), or a POSIX process group on macOS and BSDs.

When the Rust value that owns the container is dropped — by normal flow, by an error propagating through ?, or by a panic — the kernel kills every member of the tree. Grandchildren included. This is not a best-effort signal sent to a list of pids. It is one atomic kernel operation. A child cannot escape the container by forking quickly; a signal cannot be missed because a descendant already changed its session.

The library reports the mechanism it actually got — mechanism() returns JobObject, CgroupV2, or ProcessGroup — so you can verify the guarantee rather than assume it.

Getting started

[dependencies]
processkit = "1"

A tokio runtime is required. Requires Rust 1.88 or later (MSRV). The crate is stable at 1.0; breaking changes land only in a new major version.

The simplest case — run a command, get its trimmed stdout, fail on error:

use processkit::Command;

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let branch = Command::new("git")
        .args(["branch", "--show-current"])
        .run()
        .await?;
    println!("on branch: {branch}");
    Ok(())
}

When you need more than "success or error" — the exit code, both streams, whether the run timed out — output_string() returns the full result without raising on a non-zero exit:

use processkit::Command;
#[tokio::main] async fn main() -> processkit::Result<()> {
let result = Command::new("cargo").arg("test").output_string().await?;
if result.timed_out() {
    eprintln!("tests hung; partial output:\n{}", result.stdout());
} else if !result.is_success() {
    eprintln!("exit {}: {}", result.code().unwrap(), result.stderr());
}
Ok(()) }

The key design choice: a non-zero exit is captured data until you explicitly ask for success. Timeouts are captured in the result. Only cancellation is always an error — because an abandoned run has no result worth inspecting.

Feature flags

Each flag is additive. The kill-on-drop guarantee is unconditional in every configuration.

FeatureDefaultAdds
process-controlSignals, suspend/resume, members(), adopt()
statsResource measurement: CPU time, peak memory, sample_stats, profile
limitsWhole-tree resource caps (implies stats)
recordRecord/replay cassettes (adds serde)
mockmockall-generated MockRunner (semver-exempt surface)
tracingLifecycle events via the tracing crate (never logs argv/env values)

How it compares

whole-tree kill-on-dropasynclimits / statsstreaming · pipelines · supervision
std::process
tokio::process
command-group
async-process✓ (smol)
ductpipelines only
processkit✓ (tokio)

The first column is the differentiator: descendants are contained and reaped as a unit, not just the direct child.

Consuming verbs

Every run begins with the same Command builder; the verb you end with determines what you receive:

What you wantVerbWhat you get
stdout, success requiredrun()trimmed String; non-zero / timeout / kill → typed error
full outcome, exit as dataoutput_string()ProcessResult — code, stdout, stderr, timed_out
just the exit codeexit_code()i32; a timed-out run errors instead of returning -1
a yes/no answerprobe()bool0 → true, 1 → false, anything else errors
a typed value from stdoutparse(|s| …)T, success required
typed value, non-zero oktry_parse(|s| …)Option<T>None on non-zero
first matching output linefirst_line(|l| …)Option<String>
a live handle for streamingstart()RunningProcess

The same vocabulary is available on every layer: ProcessRunner, ProcessGroup, CliClient.

Streaming and interactive I/O

For commands that produce large or incremental output, start() returns a live handle you drive yourself. Stream stdout line by line as the child produces it, with no buffering and no waiting for exit:

use processkit::{Command, StreamExt, Finished, Outcome};

#[tokio::main]
async fn main() -> processkit::Result<()> {
    let mut run = Command::new("cargo")
        .args(["build", "--release"])
        .start()
        .await?;

    let mut lines = run.stdout_lines()?;
    while let Some(line) = lines.next().await {
        println!("{line}");
    }
    // Stderr was drained in the background the whole time.
    let Finished { outcome, stderr, .. } = run.finish().await?;
    if outcome != Outcome::Exited(0) {
        eprintln!("build failed:\n{stderr}");
    }
    Ok(())
}

For conversational tools — send a request, read the response, repeat — keep_stdin_open() gives you an async writer you can interleave with reads. The library handles the background stderr drain so the child can never block on a full pipe while you're busy with stdout.

Readiness probes solve "start a server, then use it" without guessing at an arbitrary sleep:

use processkit::Command;
use std::time::Duration;
#[tokio::main] async fn main() -> processkit::Result<()> {
let mut run = Command::new("my-server").start().await?;

// Wait for the startup banner on stdout:
run.wait_for_line(|l| l.contains("listening on"), Duration::from_secs(10))
    .await?;

// Or wait for a TCP port to accept connections:
run.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10))
    .await?;
Ok(()) }

A probe that cannot pass — the child exited, or the deadline elapsed — fails with Error::NotReady and does not kill the child. You decide what to do next.

Shell-free pipelines

a | b | c without a shell string. Stages are connected in-process through a relay, so there are no quoting rules, no word-splitting, and no injection surface. All stages share one kill-on-drop group.

use processkit::Command;

#[tokio::main] async fn main() -> processkit::Result<()> {
let authors = (Command::new("git").args(["log", "--format=%an"])
    | Command::new("sort")
    | Command::new("uniq").arg("-c"))
    .run()
    .await?;
println!("{authors}");
Ok(()) }

The outcome follows pipefail semantics: stdout is always the last stage's output, but the exit code, stderr, and program name are attributed to the first stage that failed. A stage that legitimately stops reading early — the classic producer | head -n1 shape — can be marked .unchecked_in_pipe() so its broken-pipe exit is not counted as a failure.

Timeouts, retries, and cancellation

Command::timeout(d) kills the whole process tree at the deadline. For the one-shot capture verbs the expiry is part of the result; for the success-checking verbs it becomes a typed Error::Timeout that carries the partial output captured before the kill — useful for diagnosing what a hung tool's last words were.

Command::retry(attempts, backoff, classifier) replays the run on transient failure. The classifier sees the typed error — you can match on the exit code, an Error::Timeout, or the captured stderr. A cancelled run is never retried: the token stays cancelled.

CancellationToken (re-exported from tokio-util) is the coordinated shutdown primitive. Wire the same parent token into many jobs via child tokens; cancelling the parent kills every process tree and every consuming path reports Error::Cancelled.

use processkit::{CancellationToken, Command};
use std::time::Duration;

#[tokio::main] async fn main() -> processkit::Result<()> {
let shutdown = CancellationToken::new();

let job = tokio::spawn({
    let token = shutdown.child_token();
    async move {
        Command::new("long-job")
            .timeout(Duration::from_secs(30))
            .cancel_on(token)
            .run()
            .await
    }
});

// Signal from anywhere — Ctrl-C, sibling failure, UI button:
shutdown.cancel();
Ok(()) }

Keeping a service alive

Where retry answers "replay this one operation until it succeeds," a Supervisor answers "keep this running." It restarts the command on exit per policy, with bounded restarts, exponential backoff, and per-default jitter so a restarted fleet doesn't pile back in lockstep:

use processkit::{Command, RestartPolicy, Supervisor};
use std::time::Duration;

#[tokio::main] async fn main() -> processkit::Result<()> {
let outcome = Supervisor::new(Command::new("my-server").args(["--port", "8080"]))
    .restart(RestartPolicy::OnCrash)
    .max_restarts(10)
    .backoff(Duration::from_millis(200), 2.0)
    .storm_pause(Duration::from_secs(15))  // crash-loop guard
    .run()
    .await?;

println!("ended after {} restarts: {:?}", outcome.restarts, outcome.stopped);
Ok(()) }

The optional storm guard distinguishes "fails occasionally" from "crash-looping": it maintains a half-life score that grows with each failure and decays between them. When the score exceeds a threshold, the supervisor takes one collective pause rather than hammering the restart timer at backoff speed.

Resource limits

With the limits feature, a ProcessGroup can cap the whole tree's memory, process count, and CPU at creation time — enforced by the same kernel object that provides kill-on-drop:

use processkit::{Command, ProcessGroup, ProcessGroupOptions};

#[tokio::main] async fn main() -> processkit::Result<()> {
let group = ProcessGroup::with_options(
    ProcessGroupOptions::default()
        .memory_max(512 * 1024 * 1024)  // 512 MiB across the whole tree
        .max_processes(64)
        .cpu_quota(0.5),                 // half of one core
)?;
let _job = group.start(&Command::new("untrusted-tool")).await?;
Ok(()) }

A limit that cannot be enforced — no cgroup delegation, no Job Object — is a hard error at group creation time, not a silently unapplied cap. An unapplied cap is no protection.

Testing code that shells out

Subprocess behavior is notoriously difficult to test. ProcessKit exposes a single trait — ProcessRunner — that decouples "what to run" from "how to run it." Production code takes a runner generically; tests inject a double.

#![allow(unused)]
fn main() {
use processkit::{Command, ProcessRunner, ProcessRunnerExt, Result};

async fn current_branch(runner: &impl ProcessRunner) -> Result<String> {
    runner.run(&Command::new("git").args(["branch", "--show-current"])).await
}
}

The ScriptedRunner returns canned replies for matched commands. The RecordingRunner captures every invocation for assertion. With the record feature, RecordReplayRunner records real runs to a JSON cassette and replays them in CI — fast, hermetic, byte-stable, no subprocess. The seam covers streaming too: a scripted start() feeds canned lines through the same pump machinery the real child uses, so stdout_lines, wait_for_line, and finish all behave identically in tests.

The cli_client! macro generates typed wrappers around external tools (git, gh, kubectl, …) that are injectable for free:

#![allow(unused)]
fn main() {
use processkit::{cli_client, ProcessRunner, Result};
use std::path::Path;

cli_client!(pub struct Git => "git");

impl<R: ProcessRunner> Git<R> {
    pub async fn head(&self, dir: &Path) -> Result<String> {
        self.core.run(self.core.command_in(dir, ["rev-parse", "HEAD"])).await
    }
}

// In production: Git::new().head(Path::new(".")).await
// In tests:      Git::with_runner(ScriptedRunner::new().on([…], Reply::ok("abc\n")))
}

Guides

The Cookbook maps "I want to do X" directly to a working snippet — the fastest way in. The individual guides go deeper on each topic:

GuideCovers
CookbookTask-to-snippet recipes for every capability
Running commandsThe full Command builder, every verb, error semantics
Process groupsContainment, teardown, signals, suspend/resume, limits, stats
Streaming & interactive I/OLine streaming, interactive stdin, readiness probes, profiling
PipelinesShell-free chains, pipefail attribution, chain timeouts
Timeouts, retries & cancellationDeadlines, retry classifiers, CancellationToken
SupervisionRestart policies, backoff & jitter, storm guard, outcomes
Testing your codeProcessRunner seam, scripted/recording/cassette doubles, CliClient
Platform supportMechanisms, capability matrices, platform caveats
UpgradingPer-version migration notes

API reference: docs.rs/processkit.

What's next

ProcessKit is a Rust library today, published as processkit on crates.io. The plan is to bring the same approach — kernel-backed whole-tree containment, honest error semantics, and testable seams — to other ecosystems: a Go package, an F# library, a Kotlin library, and a Python wrapper. Each implementation will follow the same philosophy and be documented here as it ships.

A note on development. This project was built with significant assistance from AI tools throughout the design and implementation process. That said, every line of code was read, understood, and deliberately chosen — this is not generated output dropped into a repository unchecked. The author takes full responsibility for correctness, API design, and the published result.