The Quipu-Log Book
Part 7 · The write & read paths

29 · The async pipeline: non-blocking emit and backpressure

If handling an HTTP request takes tens of microseconds, you can't afford to spend hundreds of microseconds writing an audit log to disk. This chapter walks through how Quipu-Log turns emit into a non-blocking path that queues the event and returns immediately.

In one sentence

emit pushes an event into a bounded channel and returns right away. The actual disk write is handled by a dedicated writer thread.

The producer-consumer pattern

This is exactly the queue you already know. A single channel sits between the producers (the app threads handling requests) and the consumer (the writer thread that writes to disk). A producer's job ends the moment it drops the event into the channel; the consumer picks events out one by one and writes them to disk in order.

Rust's standard library sync_channel is that channel. Bounded means there's a cap on how many events can wait inside it (4096 by default). Without a cap, memory usage would be unbounded.

Request thread A handle.emit() Request thread B handle.emit() Request thread C handle.emit() Bounded channel capacity: 4096 ↑ try_send writer thread audit-writer AuditStore disk write ← returns QueueFull when channel is full
Producers (request threads) drop events into the channel and return immediately. Only the dedicated writer thread ever touches AuditStore.

The core code: what emit actually does

AuditHandle is the cloneable handle that app code holds directly. Looking at emit_unchecked shows just how short the hot path really is.

crates/quipu-middleware/src/pipeline.rspub fn emit_unchecked(&self, event: AuditEvent) -> Result<(), MiddlewareError> {
    if self.reject_emit_when_disk_full && self.health.disk_full() { ... }
    self.metrics.queue_inc();
    match self.tx.try_send(Command::Emit(Box::new(event))) {
        Ok(()) => Ok(()),
        Err(TrySendError::Full(Command::Emit(ev))) => {
            self.metrics.rejected_queue_full();
            Err(MiddlewareError::QueueFull(ev))
        }
        Err(_) => Err(MiddlewareError::WorkerGone),
    }
}

try_send returns Err immediately — without blocking — when the channel is full. That produces a QueueFull error, and the event comes back to the caller (so the caller can drop it or route it to a separate fallback). This is backpressure — the system's way of saying "I'm too busy right now, I can't take any more."

Analogy

Think of a restaurant handing out numbered tickets. The guest (producer) takes a ticket and immediately goes to find a seat — no waiting. The kitchen (writer thread) processes orders in ticket order. When the tickets run out (channel full), no more can be issued — that's QueueFull.

AuditPipeline and AuditHandle

AuditPipeline owns the writer thread. Exactly one exists for the lifetime of the server; at shutdown you call shutdown() to drain whatever's left in the queue and clean up the thread. AuditHandle is the lightweight clone you get by calling handle() on the pipeline. It wraps an Arc<SyncSender>, so cloning is essentially free — pass it wherever you need it.

crates/quipu-middleware/src/pipeline.rs// Start the pipeline once, at server initialization
let pipeline = AuditPipeline::start(store, root, permissions, cfg, fallback)?;

let handle = pipeline.handle(); // cloneable AuditHandle
// pass handle anywhere via axum State, Arc, etc.
handle.emit(&role, event)?;   // non-blocking
Why this design?

quipu-core's AuditStore follows the single-writer principle (Ch. 13). If multiple request threads called AuditStore directly in a multi-threaded server, you'd get lock contention or consistency issues. Serializing through a channel to a single thread lets the store keep its single-writer guarantee while keeping the app completely non-blocking.

Check yourself

① What would go wrong if send (blocking send) were used instead of try_send?
② Why does a QueueFull error return the event to the caller rather than simply dropping it?
③ Why is AuditHandle designed to implement Clone?