Skip to main content

Communication Architecture

PiCrust uses channels for bidirectional communication between your application and running agents.

Input Messages

InputMessage Enum

Messages sent TO the agent:
pub enum InputMessage {
    UserInput(String),
    ToolResult {
        tool_use_id: String,
        result: ToolResult,
    },
    PermissionResponse {
        tool_name: String,
        allowed: bool,
        remember: bool,
    },
    UserQuestionResponse {
        request_id: String,
        answers: HashMap<String, String>,
    },
    SubAgentComplete {
        session_id: String,
        result: Option<String>,
    },
    Interrupt,
    Shutdown,
}

Sending Input

// User input (most common)
handle.send_input("Write a hello world program").await?;

// Permission response
handle.send_permission_response("Bash", true, false).await?;

// Tool result (for async tools)
handle.send(InputMessage::ToolResult {
    tool_use_id: "tool_123".to_string(),
    result: ToolResult::success("Done"),
}).await?;

// User question response
handle.send(InputMessage::UserQuestionResponse {
    request_id: "req_123".to_string(),
    answers: HashMap::from([
        ("Auth".to_string(), "JWT".to_string()),
    ]),
}).await?;

// Control messages
handle.send(InputMessage::Interrupt).await?;
handle.send(InputMessage::Shutdown).await?;

Output Messages

OutputChunk Enum

Messages sent FROM the agent:
pub enum OutputChunk {
    TextDelta(String),
    TextComplete(String),
    ThinkingDelta(String),
    ThinkingComplete(String),
    ToolStart {
        id: String,
        name: String,
        input: serde_json::Value,
    },
    ToolProgress {
        id: String,
        output: String,
    },
    ToolEnd {
        id: String,
        result: ToolResult,
    },
    PermissionRequest {
        tool_name: String,
        action: String,
        input: serde_json::Value,
        details: Option<String>,
    },
    AskUserQuestion {
        request_id: String,
        questions: Vec<UserQuestion>,
    },
    SubAgentSpawned {
        session_id: String,
        agent_type: String,
    },
    SubAgentOutput {
        session_id: String,
        chunk: Box<OutputChunk>,
    },
    SubAgentComplete {
        session_id: String,
        result: Option<String>,
    },
    StateChange(AgentState),
    Status(String),
    Error(String),
    Done,
}

Receiving Output

// Subscribe to output stream
let mut rx = handle.subscribe();

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::TextDelta(text) => {
            // Stream text token
            print!("{}", text);
        }

        OutputChunk::ToolStart { name, input, .. } => {
            // Tool execution started
            println!("\n[Using tool: {}]", name);
        }

        OutputChunk::PermissionRequest { tool_name, action, .. } => {
            // Permission needed
            println!("Permission: {} wants to {}", tool_name, action);
            handle.send_permission_response(tool_name, true, false).await?;
        }

        OutputChunk::StateChange(state) => {
            // Agent state changed
            update_ui_state(state);
        }

        OutputChunk::Error(message) => {
            // Error occurred
            eprintln!("Error: {}", message);
        }

        OutputChunk::Done => {
            // Agent finished
            break;
        }

        _ => {}
    }
}

Critical Pattern: Subscribe Before Send

Always subscribe to the output stream BEFORE sending input, or you’ll miss early output chunks!

Correct Order

// 1. Subscribe first
let mut rx = handle.subscribe();

// 2. Then send input
handle.send_input("Hello").await?;

// 3. Process output
while let Ok(chunk) = rx.recv().await {
    // Handle chunks
}

Wrong Order

// Wrong: Send first
handle.send_input("Hello").await?;

// Too late! You missed early chunks
let mut rx = handle.subscribe();

Why This Matters

Agents start processing immediately when they receive input. If you subscribe after sending, you’ll miss:
  • Early text tokens
  • Tool execution notifications
  • State changes
  • Permission requests

The Foolproof Pattern

async fn interact_with_agent(
    handle: &AgentHandle,
    user_input: &str,
) -> Result<String> {
    // 1. Subscribe
    let mut rx = handle.subscribe();

    // 2. Send
    handle.send_input(user_input).await?;

    // 3. Process
    let mut response = String::new();

    while let Ok(chunk) = rx.recv().await {
        match chunk {
            OutputChunk::TextDelta(text) => {
                response.push_str(&text);
                print!("{}", text);  // Live streaming
            }
            OutputChunk::Done => break,
            _ => {}
        }
    }

    Ok(response)
}

Multiple Subscribers

Multiple parts of your application can subscribe to the same agent:
// Frontend UI subscriber
let mut rx1 = handle.subscribe();
tokio::spawn(async move {
    while let Ok(chunk) = rx1.recv().await {
        update_ui(chunk);
    }
});

// Logger subscriber
let mut rx2 = handle.subscribe();
tokio::spawn(async move {
    while let Ok(chunk) = rx2.recv().await {
        log_output(chunk);
    }
});

// Analytics subscriber
let mut rx3 = handle.subscribe();
tokio::spawn(async move {
    while let Ok(chunk) = rx3.recv().await {
        track_event(chunk);
    }
});

// All receive the same chunks
handle.send_input("Hello").await?;

Message Processing Patterns

Simple Echo

let mut rx = handle.subscribe();
handle.send_input("Hello").await?;

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::TextDelta(text) => print!("{}", text),
        OutputChunk::Done => break,
        _ => {}
    }
}

Collect Full Response

let mut rx = handle.subscribe();
handle.send_input("Explain Rust").await?;

let mut full_response = String::new();

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::TextDelta(text) => {
            full_response.push_str(&text);
        }
        OutputChunk::TextComplete(text) => {
            full_response = text;
        }
        OutputChunk::Done => break,
        _ => {}
    }
}

println!("Full response: {}", full_response);

Handle All Events

let mut rx = handle.subscribe();
handle.send_input("Read file.txt").await?;

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::TextDelta(text) => {
            ui.append_text(text);
        }

        OutputChunk::ThinkingDelta(thinking) => {
            ui.show_thinking(thinking);
        }

        OutputChunk::ToolStart { name, .. } => {
            ui.show_tool_indicator(name);
        }

        OutputChunk::ToolEnd { result, .. } => {
            ui.hide_tool_indicator();
        }

        OutputChunk::PermissionRequest { tool_name, action, .. } => {
            let allowed = ui.show_permission_dialog(tool_name, action);
            handle.send_permission_response(tool_name, allowed, false).await?;
        }

        OutputChunk::StateChange(state) => {
            ui.update_state_indicator(state);
        }

        OutputChunk::Error(message) => {
            ui.show_error(message);
        }

        OutputChunk::Done => {
            ui.set_input_enabled(true);
            break;
        }

        _ => {}
    }
}

Multi-Turn Conversations

A single user input can trigger multiple LLM calls: Handle this naturally:
let mut rx = handle.subscribe();
handle.send_input("Write hello.rs and run it").await?;

// Just keep processing until Done
while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::TextDelta(text) => print!("{}", text),
        OutputChunk::ToolStart { name, .. } => {
            println!("\n[Tool: {}]", name);
        }
        OutputChunk::Done => break,
        _ => {}
    }
}

Permission Flow

Request-Response Pattern

Implementation

let mut rx = handle.subscribe();
handle.send_input("Delete all files").await?;

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::PermissionRequest {
            tool_name,
            action,
            input,
            details
        } => {
            // Show permission dialog
            println!("Permission needed:");
            println!("  Tool: {}", tool_name);
            println!("  Action: {}", action);
            println!("  Input: {}", input);

            // Get user decision (blocking in real app)
            print!("Allow? (y/n): ");
            let mut response = String::new();
            std::io::stdin().read_line(&mut response)?;

            let allowed = response.trim().eq_ignore_ascii_case("y");

            // Send response
            handle.send_permission_response(
                tool_name,
                allowed,
                false  // remember
            ).await?;
        }

        OutputChunk::Done => break,
        _ => {}
    }
}

Ask User Questions Flow

Interactive Questions

Implementation

let mut rx = handle.subscribe();
handle.send_input("Implement authentication").await?;

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::AskUserQuestion { request_id, questions } => {
            println!("Agent has questions:");

            let mut answers = HashMap::new();

            for question in questions {
                println!("\n{}", question.question);
                for (i, option) in question.options.iter().enumerate() {
                    println!("  {}. {} - {}", i + 1, option.label, option.description);
                }

                // Get user selection
                print!("Select (1-{}): ", question.options.len());
                let mut input = String::new();
                std::io::stdin().read_line(&mut input)?;

                let index: usize = input.trim().parse().unwrap_or(1) - 1;
                let selected = &question.options[index];

                answers.insert(
                    question.header.clone(),
                    selected.label.clone(),
                );
            }

            // Send answers
            handle.send(InputMessage::UserQuestionResponse {
                request_id,
                answers,
            }).await?;
        }

        OutputChunk::Done => break,
        _ => {}
    }
}

Subagent Flow

Parent-Child Communication

Implementation

let mut rx = handle.subscribe();
handle.send_input("Research and summarize topic X").await?;

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::SubAgentSpawned { session_id, agent_type } => {
            println!("Spawned subagent: {} ({})", session_id, agent_type);

            // Optionally subscribe to subagent
            if let Some(child_handle) = runtime.get(&session_id).await {
                let mut child_rx = child_handle.subscribe();
                tokio::spawn(async move {
                    while let Ok(child_chunk) = child_rx.recv().await {
                        println!("  [Child]: {:?}", child_chunk);
                    }
                });
            }
        }

        OutputChunk::SubAgentOutput { session_id, chunk } => {
            println!("[Subagent {}]: {:?}", session_id, chunk);
        }

        OutputChunk::SubAgentComplete { session_id, result } => {
            println!("Subagent {} completed: {:?}", session_id, result);
        }

        OutputChunk::Done => break,
        _ => {}
    }
}

Interrupt Flow

Graceful Cancellation

Implementation

let mut rx = handle.subscribe();
handle.send_input("Generate 1000 files").await?;

// User clicks cancel after 2 seconds
tokio::spawn({
    let handle = handle.clone();
    async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        handle.send(InputMessage::Interrupt).await.ok();
    }
});

while let Ok(chunk) = rx.recv().await {
    match chunk {
        OutputChunk::Done => {
            println!("Operation stopped");
            break;
        }
        _ => {}
    }
}

Channel Characteristics

Broadcast Channels

Output uses tokio::sync::broadcast:
  • Multiple subscribers: Many receivers get same messages
  • Bounded buffer: Configurable capacity (default: 1024)
  • Lagging behavior: Slow subscribers may miss messages
  • Clonable: Call subscribe() multiple times

MPSC Channels

Input uses tokio::sync::mpsc:
  • Single consumer: Only the agent receives
  • Unbounded: Won’t block senders
  • Order preserved: FIFO message delivery
  • Clonable sender: Multiple parts can send

Performance Considerations

Buffer Size

Default broadcast buffer is 1024 messages:
// Most applications don't need to change this
// But you can if needed:
const BROADCAST_CAPACITY: usize = 2048;

Subscriber Lag

If a subscriber is slow, it may miss messages:
match rx.recv().await {
    Ok(chunk) => {
        // Process chunk
    }
    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
        eprintln!("Missed {} messages due to slow processing", n);
    }
    Err(_) => {
        // Channel closed
        break;
    }
}

Backpressure

The system doesn’t implement backpressure. Fast producers continue regardless of slow consumers. Use fast async processing to keep up.

Best Practices

1. Subscribe Early

// Good
let mut rx = handle.subscribe();
handle.send_input("task").await?;

// Bad
handle.send_input("task").await?;
let mut rx = handle.subscribe();  // Missed early output

2. Handle All Cases

match chunk {
    OutputChunk::TextDelta(text) => { /* ... */ }
    OutputChunk::PermissionRequest { .. } => { /* ... */ }
    OutputChunk::Error(e) => { /* ... */ }
    OutputChunk::Done => { /* ... */ }
    _ => {}  // Don't forget default case
}

3. Don’t Block the Receiver

// Bad: Blocking operation
while let Ok(chunk) = rx.recv().await {
    expensive_sync_operation(&chunk);  // Blocks receiver
}

// Good: Spawn for expensive work
while let Ok(chunk) = rx.recv().await {
    tokio::spawn(async move {
        expensive_operation(&chunk).await;
    });
}

4. Clean Up Subscribers

// Subscribers are automatically dropped when out of scope
{
    let mut rx = handle.subscribe();
    // Process...
} // rx dropped here

5. Use Timeouts

use tokio::time::{timeout, Duration};

let mut rx = handle.subscribe();
handle.send_input("task").await?;

loop {
    match timeout(Duration::from_secs(30), rx.recv()).await {
        Ok(Ok(chunk)) => {
            // Process chunk
            if matches!(chunk, OutputChunk::Done) {
                break;
            }
        }
        Ok(Err(_)) => {
            // Channel closed
            break;
        }
        Err(_) => {
            // Timeout
            println!("No output for 30 seconds");
            break;
        }
    }
}

Next Steps

Agent States

Understand all agent states and transitions

Streaming & History

Critical dual-channel architecture pattern

OutputChunk Reference

Complete OutputChunk API documentation

InputMessage Reference

Complete InputMessage API documentation