use axum::{ extract::State, response::sse::{Event, KeepAlive, Sse}, http::StatusCode, }; use futures::stream::{self, Stream}; use log::{debug, info}; use std::convert::Infallible; use std::time::Duration; use crate::modes::server::common::AppState; use crate::modes::server::mcp::KeepMcpServer; #[utoipa::path( get, path = "/mcp/sse", operation_id = "mcp_sse", summary = "MCP SSE endpoint", description = "Server-Sent Events for Model Context Protocol. Enables AI tools to interact with Keep's storage and retrieval functions.", responses( (status = 200, description = "SSE stream established"), (status = 401, description = "Unauthorized"), (status = 500, description = "Internal server error") ), security( ("bearerAuth" = []) ), tag = "mcp" )] pub async fn handle_mcp_sse( State(state): State, ) -> Result>>, StatusCode> { debug!("MCP: Starting SSE endpoint"); let _mcp_server = KeepMcpServer::new(state); // Create a simple message channel for SSE communication let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); // Send initial connection message let _ = tx.send("data: {\"type\":\"connection\",\"status\":\"connected\"}\n\n".to_string()); // For now, create a simple stream that sends periodic keep-alive messages // In a full implementation, this would integrate with the rmcp transport layer let stream = stream::unfold((rx, tx), |(mut rx, tx)| async move { tokio::select! { msg = rx.recv() => { match msg { Some(data) => { let event = Event::default().data(data); Some((Ok(event), (rx, tx))) } None => None, } } _ = tokio::time::sleep(Duration::from_secs(30)) => { let event = Event::default() .event("keep-alive") .data("ping"); Some((Ok(event), (rx, tx))) } } }); info!("MCP: SSE endpoint established"); Ok(Sse::new(stream).keep_alive( KeepAlive::new() .interval(Duration::from_secs(30)) .text("keep-alive"), )) }