use std::net::SocketAddr; use std::sync::Arc; use anyhow::Result; use clap::Parser; use futures_util::{SinkExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::tungstenite::Message; use tracing::{error, info, warn}; use tracing_subscriber::FmtSubscriber; mod config; mod engine; mod pixelutil; use config::Args; use engine::VideoEngine; #[tokio::main] async fn main() -> Result<()> { // 1. Initialize logging let subscriber = FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) .finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); // 2. Parse arguments let args = Args::parse(); let addr = args.addr; // 3. Start Video Engine let engine = Arc::new(VideoEngine::new(args.clone())); engine.start(); // 4. Start TCP Listener let listener = TcpListener::bind(&addr).await?; info!("Listening on: {}", addr); // 5. Connection handling loop while let Ok((stream, peer_addr)) = listener.accept().await { let engine = engine.clone(); tokio::spawn(async move { if let Err(e) = accept_connection(stream, peer_addr, engine).await { error!("Connection error from {}: {:?}", peer_addr, e); } }); } Ok(()) } async fn accept_connection(stream: TcpStream, peer_addr: SocketAddr, engine: Arc) -> Result<()> { info!("New connection from: {}", peer_addr); let ws_stream = tokio_tungstenite::accept_async(stream).await?; info!("WebSocket handshake success: {}", peer_addr); let (mut write, mut read) = ws_stream.split(); // Subscribe to video feed let mut rx = engine.subscribe(); // Task to send video frames to client let mut send_task = tokio::spawn(async move { loop { match rx.recv().await { Ok(packet) => { let mut payload = Vec::with_capacity(packet.data.len() + 1); // 1 = keyframe, 0 = delta payload.push(if packet.is_key { 1 } else { 0 }); payload.extend_from_slice(&packet.data); if let Err(e) = write.send(Message::Binary(payload.into())).await { // Client likely disconnected warn!("Failed to send to {}: {}", peer_addr, e); break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { warn!("Client {} lagged, skipped {} frames", peer_addr, skipped); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { break; } } } }); // Task to read messages (keep-alive / close) let mut recv_task = tokio::spawn(async move { while let Some(msg) = read.next().await { match msg { Ok(Message::Close(_)) => break, Ok(_) => {}, // Ignore other messages for now Err(_) => break, } } }); tokio::select! { _ = (&mut send_task) => {}, _ = (&mut recv_task) => {}, }; info!("Connection closed: {}", peer_addr); send_task.abort(); recv_task.abort(); Ok(()) }