Pipelines

‹ docs index

a | b | c without a shell. Each stage's stdout feeds the next stage's stdin through an in-process relay (a tokio::io::copy task per boundary) — there is no shell string anywhere, so no quoting rules, no word splitting, no injection surface. All stages spawn into one shared kill-on-drop process group, so the chain lives and dies as a unit. (The relay is an implementation detail, not a kernel splice: a producer whose consumer exits early stops on a broken pipe when the relay's next write fails, rather than instantly via SIGPIPE.)

Building and running

Command::pipe(next) starts a Pipeline; chain more stages with Pipeline::pipe; drive it with output_string() or run():

use processkit::Command;

#[tokio::main]
async fn main() -> processkit::Result<()> {
    // git log --format=%an | sort | uniq -c
    let authors = Command::new("git").args(["log", "--format=%an"])
        .pipe(Command::new("sort"))
        .pipe(Command::new("uniq").arg("-c"))
        .run()                         // require every stage to succeed
        .await?;
    println!("{authors}");
    Ok(())
}

The verbs mirror Command's, each operating on the pipefail outcome:

VerbReturnsA failing stage is…
output_string()ProcessResult<String>…reported in the result (code/stderr/program of the first unclean stage)
output_bytes()ProcessResult<Vec<u8>>…same, with the last stage's stdout captured raw (binary pipes)
run()trimmed final stdout…raised as that stage's Error::Exit; fails loud on a truncated capture
checked()full ProcessResult<String>…raised as Error::Exit (untrimmed stdout)
run_unit()()…raised as Error::Exit (output discarded)
exit_code()i32…its attributed code (no code → Error::Timeout/Signalled)
probe()bool0true, 1false, else Err
parse(|s| …) / try_parse(|s| …)T…raised as Error::Exit; fails loud on a truncated capture

Err from output_string itself means a stage couldn't be started or driven at all (spawn failure, broken plumbing) — never a mere non-zero exit.

The streaming first_line probe is deliberately not a pipeline verb: a chain consumes its last stage in full to fold the pipefail outcome. To capture the first matching line of a finished chain, add a | head -n1 (Unix) / grep -m1 / findstr stage and capture. This does not cover a streaming readiness probe over a chain that must keep running (e.g. wait for a banner line, then leave the chain alive) — | head would tear it down; use a single Command with first_line for that.

The | operator is sugar for the same thing — a | b | ca.pipe(b).pipe(c). Parenthesize the chain before a terminal verb, since method calls bind tighter than |:

#![allow(unused)]
fn main() {
use processkit::Command;

let authors = (Command::new("git").args(["log", "--format=%an"])
    | Command::new("sort")
    | Command::new("uniq").arg("-c"))
    .run()
    .await?;
}

Semantics: pipefail and the ends

The outcome is pipefail, like set -o pipefail in a shell:

  • stdout is always the last stage's output — that's what the chain produced.
  • code, stderr, and the reported program come from the first stage that didn't exit cleanly (non-zero, signal-killed, or timed out) — or from the last stage when every stage succeeded.
#![allow(unused)]
fn main() {
use processkit::Command;

let result = Command::new("cat").arg("data.txt")
    .pipe(Command::new("grep").arg("ERROR"))      // suppose grep exits 2 (bad pattern)
    .pipe(Command::new("wc").arg("-l"))
    .output_string()
    .await?;

// Diagnostics point at grep — the first unclean stage — while stdout is
// whatever wc managed to print:
assert_eq!(result.code(), Some(2));
println!("blamed: {}", result.ensure_success().unwrap_err()); // names `grep`
}

The ends of the chain behave like a single Command:

  • The first stage's configured stdin source is honored — feed the whole pipeline from a string, file, or stream.
  • Inner stages read from the pipe, full stop: any stdin source or keep_stdin_open configured on them is overridden.
  • Inner stages' stderr is captured per-stage for pipefail diagnostics; only the last stage's stdout reaches you.
#![allow(unused)]
fn main() {
use processkit::{Command, Stdin};

let unique_count = Command::new("sort")
    .stdin(Stdin::from_iter_lines(["b", "a", "b", "c"]))
    .pipe(Command::new("uniq"))
    .pipe(Command::new("wc").arg("-l"))
    .run()
    .await?;
assert_eq!(unique_count.trim(), "3");
}

Unchecked stages

Strict pipefail has one classic false positive: a consumer that legitimately stops reading early. In producer | head -1 the consumer exits 0 after one line and closes the pipe; the producer then stops on a broken pipe — its next write fails once the relay's downstream is gone (a broken-pipe write error, or SIGPIPE where the OS delivers it) — a perfectly normal death that strict pipefail would blame the chain for. Mark that stage unchecked_in_pipe():

#![allow(unused)]
fn main() {
use processkit::Command;

// seq 1 1000000 | head -1 — the producer's broken-pipe death is expected.
let first = (Command::new("seq").args(["1", "1000000"]).unchecked_in_pipe()
    | Command::new("head").args(["-n", "1"]))
    .run()
    .await?;
assert_eq!(first.trim(), "1");
}

The rules:

  • An unchecked stage's unclean exit — a non-zero code, a broken-pipe write failure (or SIGPIPE where the OS delivers it) from a consumer that closed early, or its own per-stage timeout kill — is skipped when the chain decides what to report.
  • A checked failure always trumps an unchecked one, regardless of position: unchecked never shields another stage's real failure.
  • A chain whose only failures are unchecked reports success (the last stage's stdout, code 0).
  • unchecked forgives exit status only — never a whole-chain Pipeline::timeout, and it has no effect on a Command run outside a pipeline (a single run's status is already plain data in its ProcessResult).

Timeouts

Two scopes, deliberately distinct:

#![allow(unused)]
fn main() {
use processkit::Command;
use std::time::Duration;

let out = Command::new("producer")
    .timeout(Duration::from_secs(10))      // per-STAGE: kills just `producer`
    .pipe(Command::new("consumer"))
    .timeout(Duration::from_secs(30))      // whole-CHAIN: Pipeline::timeout
    .output_string()
    .await?;
}
  • Pipeline::timeout bounds the whole chain: at the deadline the shared group is torn down and the result reports timed_out (no partial stdout — unlike a single command's captured timeout).
  • A per-stage Command::timeout kills just that stage. Every stage is evaluated by the same pipefail rule: a stage that hit its own deadline — inner or last — surfaces on run() as that stage's Error::Timeout, reporting that stage's own deadline (not the chain's, and never 0ns).

Cancellation has two forms. Pipeline::cancel_on(token) is the chain-level control: the token is applied to every stage, so firing it tears the whole chain down and the run resolves to Error::Cancelled. (A cancel_on token on an individual stage Command also cancels that stage and errors the pipeline, but the pipeline-level builder is the clearer authority.) See Timeouts & cancellation.

Re-running a pipeline

A Pipeline is Clone and re-runnable — stages are re-cloned per run. The one caveat is inherited from Command: a one-shot stdin source on the first stage (Stdin::from_reader / from_lines) is consumed by the first run; re-running then fails loud (an Error::Io at launch) rather than silently feeding empty stdin. Use the reusable sources (from_string / from_bytes / from_iter_lines / from_file) when a chain runs more than once.


Next: Timeouts, retries & cancellation · Running commands · Process groups