Skip to main content

Setup

use picrust::mcp::{MCPServerManager, MCPToolProvider};
use picrust::tools::ToolRegistry;
use rmcp::transport::StreamableHttpClientTransport;
use rmcp::ServiceExt;
use std::sync::Arc;

// Create transport
let transport = StreamableHttpClientTransport::from_uri("http://localhost:8005/mcp");
let service = ().serve(transport).await?;

// Add to MCP manager
let mcp_manager = Arc::new(MCPServerManager::new());
mcp_manager.add_service("filesystem", service).await?;

// Create tool provider
let mcp_provider = Arc::new(MCPToolProvider::new(mcp_manager));
let mut tool_registry = ToolRegistry::new();
tool_registry.add_provider(mcp_provider).await?;

let tools = Arc::new(tool_registry);

With Auth Headers

let transport = StreamableHttpClientTransport::from_uri("http://localhost:8005/mcp")
    .with_header("Authorization", "Bearer token")
    .with_header("X-Api-Key", "key");

let service = ().serve(transport).await?;
mcp_manager.add_service("filesystem", service).await?;

JWT Refresh

use std::time::{Duration, Instant};

let last_refresh = Arc::new(RwLock::new(Instant::now()));
let jwt_provider = Arc::new(YourJwtProvider::new());

let refresher = {
    let last_refresh = last_refresh.clone();
    let jwt = jwt_provider.clone();

    move || {
        let last_refresh = last_refresh.clone();
        let jwt = jwt.clone();

        async move {
            let mut last = last_refresh.write().await;
            if last.elapsed() < Duration::from_secs(50 * 60) {
                return Ok(None); // Still valid
            }

            let token = jwt.get_fresh_token().await?;
            let transport = StreamableHttpClientTransport::from_uri("https://backend/mcp")
                .with_header("Authorization", format!("Bearer {}", token));
            let service = ().serve(transport).await?;

            *last = Instant::now();
            Ok(Some(service)) // Replace service
        }
    }
};

// Initial service
let token = jwt_provider.get_fresh_token().await?;
let transport = StreamableHttpClientTransport::from_uri("https://backend/mcp")
    .with_header("Authorization", format!("Bearer {}", token));
let service = ().serve(transport).await?;

mcp_manager.add_service_with_refresher("remote", service, refresher).await?;

Tool Namespacing

MCP tools are namespaced with server_id__tool_name:
Server ID: filesystem
Tool: read_file
Exposed as: filesystem__read_file

Automatic Reconnection

Health check before every tool call (5s timeout). On failure:
  1. Drop old service
  2. Call refresher
  3. Retry with new service (up to 3 attempts)

Multiple Servers

let mcp_manager = Arc::new(MCPServerManager::new());

// Add server 1
mcp_manager.add_service("filesystem", fs_service).await?;

// Add server 2
mcp_manager.add_service("database", db_service).await?;

// All tools available
let mcp_provider = Arc::new(MCPToolProvider::new(mcp_manager));

Thread Safety

  • Service wrapped in Arc<RwLock<...>>
  • Multiple agents can use concurrently
  • Refresh waits for in-flight calls to complete