diff --git a/backend/Cargo.toml b/backend/Cargo.toml index f96621d..8d3c8e7 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -4,7 +4,10 @@ version = "0.1.0" edition = "2024" [dependencies] +anyhow = "1.0.100" +clap = "4.5.53" crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } +dotenv = "0.15.0" futures-util = "0.3.31" parking_lot = "0.12.5" rav1e = "0.8.1" @@ -12,6 +15,8 @@ scap = "0.0.8" scopeguard = "1.2.0" tokio = { version = "1.48.0", features = ["full"] } tokio-tungstenite = "0.28.0" +tracing = "0.1.44" +tracing-subscriber = "0.3.22" v_frame = "0.3.9" vpx-rs = "0.2.1" yuv = "0.8.9" diff --git a/backend/src/config.rs b/backend/src/config.rs new file mode 100644 index 0000000..8cc723e --- /dev/null +++ b/backend/src/config.rs @@ -0,0 +1,26 @@ +use clap::Parser; +use std::net::SocketAddr; + +#[derive(Parser, Debug, Clone)] +#[command(version, about, long_about = None)] +pub struct Args { + /// Address to listen on + #[arg(long, default_value = "0.0.0.0:8080")] + pub addr: SocketAddr, + + /// Video width + #[arg(long, default_value_t = 1920)] + pub width: u32, + + /// Video height + #[arg(long, default_value_t = 1080)] + pub height: u32, + + /// Target FPS + #[arg(long, default_value_t = 30)] + pub fps: u32, + + /// Bitrate in kbps + #[arg(long, default_value_t = 500)] + pub bitrate: u32, +} diff --git a/backend/src/engine.rs b/backend/src/engine.rs new file mode 100644 index 0000000..20bdd93 --- /dev/null +++ b/backend/src/engine.rs @@ -0,0 +1,176 @@ +use std::num::NonZero; +use std::thread; +use std::time::{Duration, Instant}; + +use anyhow::{Context, Result}; +use tokio::sync::broadcast; +use tracing::{error, info, warn}; +use vpx_rs::enc::{CodecId, EncoderFrameFlags, EncodingDeadline}; +use vpx_rs::{Encoder, EncoderConfig, RateControl, Timebase}; +use scap::capturer::{Area, Capturer, Options, Point, Resolution, Size}; +use scap::frame::FrameType; + +use crate::config::Args; +use crate::pixelutil; + +/// Represents a video packet to be sent to clients. +#[derive(Clone, Debug)] +pub struct VideoPacket { + pub data: Vec, + pub is_key: bool, +} + +/// Manages video capture and encoding. +pub struct VideoEngine { + args: Args, + packet_tx: broadcast::Sender, + shutdown_tx: broadcast::Sender<()>, +} + +impl VideoEngine { + pub fn new(args: Args) -> Self { + let (packet_tx, _) = broadcast::channel(100); + let (shutdown_tx, _) = broadcast::channel(1); + Self { + args, + packet_tx, + shutdown_tx, + } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.packet_tx.subscribe() + } + + pub fn start(&self) { + let args = self.args.clone(); + let packet_tx = self.packet_tx.clone(); + let mut shutdown_rx = self.shutdown_tx.subscribe(); + + // Spawn the capture/encode loop in a separate dedicated thread + // because it involves blocking heavy computation. + thread::spawn(move || { + if let Err(e) = run_capture_loop(args, packet_tx, &mut shutdown_rx) { + error!("Capture loop failed: {:?}", e); + } + }); + } + + #[allow(dead_code)] + pub fn stop(&self) { + let _ = self.shutdown_tx.send(()); + } +} + +fn run_capture_loop( + args: Args, + packet_tx: broadcast::Sender, + shutdown_rx: &mut broadcast::Receiver<()>, +) -> Result<()> { + // 1. Setup Capturer + let options = Options { + fps: args.fps, + show_cursor: true, + show_highlight: true, + output_type: FrameType::YUVFrame, + output_resolution: Resolution::_1080p, + crop_area: Some(Area { + origin: Point { x: 0.0, y: 0.0 }, + size: Size { + width: args.width as f64, + height: args.height as f64, + }, + }), + ..Default::default() + }; + + let mut capturer = Capturer::build(options).context("Failed to build capturer")?; + capturer.start_capture(); + + // 2. Setup Encoder + let mut config = EncoderConfig::::new( + CodecId::VP9, + args.width, + args.height, + Timebase { + num: NonZero::new(1).unwrap(), + den: NonZero::new(args.fps).unwrap(), + }, + RateControl::ConstantBitRate(args.bitrate), + ) + .context("Failed to create encoder config")?; + + config.threads = 4; // Use reasonable number of threads + config.lag_in_frames = 0; // Low latency + + let mut encoder = Encoder::new(config).context("Failed to create encoder")?; + + let frame_duration = Duration::from_secs_f64(1.0 / args.fps as f64); + let mut next_frame_time = Instant::now(); + let mut pts: i64 = 0; + + info!("Starting capture loop: {}x{} @ {}fps", args.width, args.height, args.fps); + + loop { + // Check for shutdown signal + if shutdown_rx.try_recv().is_ok() { + info!("Shutting down capture loop"); + break; + } + + let now = Instant::now(); + if now < next_frame_time { + thread::sleep(next_frame_time - now); + } + next_frame_time += frame_duration; + + match capturer.get_next_frame() { + Ok(captured_frame) => { + // Convert frame + let yuv_buffer = pixelutil::frame_to_yuv(captured_frame); + let yuv_image = pixelutil::apply_frame(&yuv_buffer, args.width as usize, args.height as usize); + + // Encode + // For real-time streaming, we force keyframes periodically could be an improvement, + // but for now relying on default behavior or client requests (not impl here). + let flags = EncoderFrameFlags::empty(); + + match encoder.encode( + pts, + 1, // Duration (in timebase units) + yuv_image, + EncodingDeadline::Realtime, + flags, + ) { + Ok(packets) => { + for packet in packets { + if let vpx_rs::Packet::CompressedFrame(frame) = packet { + let video_packet = VideoPacket { + data: frame.data.to_vec(), + is_key: frame.flags.is_key, + }; + + // Broadcast to all connected clients + // It's okay if no one is listening + let _ = packet_tx.send(video_packet); + } + } + pts += 1; + }, + Err(e) => { + error!("Encoding error: {:?}", e); + // Continue loop, maybe transient + } + } + } + Err(e) => { + // Scap might return error if no frame is available yet or other issues + // We just log debug/warn and continue + warn!("Capture error (might be transient): {:?}", e); + thread::sleep(Duration::from_millis(10)); + } + } + } + + Ok(()) +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 1aed546..9cbd9d3 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,189 +1,110 @@ -use std::borrow::Borrow; -use std::num::NonZero; -use std::thread; -use std::time::Instant; +use std::net::SocketAddr; +use std::sync::Arc; +use anyhow::Result; +use clap::Parser; use futures_util::{SinkExt, StreamExt}; -use rav1e::config::SpeedSettings; -use rav1e::data::Rational; -use scap::capturer::{Area, Capturer, Options, Point}; -use scap::frame::FrameType; use tokio::net::{TcpListener, TcpStream}; -use tokio_tungstenite::accept_async; use tokio_tungstenite::tungstenite::Message; -use vpx_rs::enc::{CodecId, CompressedFrameFlags, EncoderProfile}; -use vpx_rs::{EncoderFlags, RateControl, Timebase, YUVImageData}; +use tracing::{error, info, warn}; +use tracing_subscriber::FmtSubscriber; +mod config; +mod engine; mod pixelutil; -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), Box> { - let addr = "0.0.0.0:8080"; - let listener = TcpListener::bind(&addr).await?; - println!("Listening on: {}", addr); +use config::Args; +use engine::VideoEngine; - while let Ok((stream, _)) = listener.accept().await { - tokio::spawn(accept_connection(stream)); +#[tokio::main] +async fn main() -> Result<()> { + // 1. Initialize logging + let subscriber = FmtSubscriber::builder() + .with_max_level(tracing::Level::INFO) + .finish(); + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + // 2. Parse arguments + let args = Args::parse(); + let addr = args.addr; + + // 3. Start Video Engine + let engine = Arc::new(VideoEngine::new(args.clone())); + engine.start(); + + // 4. Start TCP Listener + let listener = TcpListener::bind(&addr).await?; + info!("Listening on: {}", addr); + + // 5. Connection handling loop + while let Ok((stream, peer_addr)) = listener.accept().await { + let engine = engine.clone(); + tokio::spawn(async move { + if let Err(e) = accept_connection(stream, peer_addr, engine).await { + error!("Connection error from {}: {:?}", peer_addr, e); + } + }); } Ok(()) } -async fn accept_connection(stream: TcpStream) { - let addr = stream - .peer_addr() - .expect("connected streams should have a peer addr"); - println!("Peer address: {}", addr); +async fn accept_connection(stream: TcpStream, peer_addr: SocketAddr, engine: Arc) -> Result<()> { + info!("New connection from: {}", peer_addr); - let ws_stream = match accept_async(stream).await { - Ok(ws) => ws, - Err(e) => { - eprintln!("Error during the websocket handshake occurred: {}", e); - return; - } - }; - - println!("New WebSocket connection: {}", addr); + let ws_stream = tokio_tungstenite::accept_async(stream).await?; + info!("WebSocket handshake success: {}", peer_addr); let (mut write, mut read) = ws_stream.split(); + + // Subscribe to video feed + let mut rx = engine.subscribe(); - // Channel to communicate between the video source thread and the websocket task - // We send (data, is_key) - let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec, bool)>(100); + // Task to send video frames to client + let mut send_task = tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(packet) => { + let mut payload = Vec::with_capacity(packet.data.len() + 1); + // 1 = keyframe, 0 = delta + payload.push(if packet.is_key { 1 } else { 0 }); + payload.extend_from_slice(&packet.data); - // Spawn a blocking thread for video processing (currently just a placeholder loop) - std::thread::spawn(move || { - if let Err(e) = process_video(tx) { - eprintln!("Video processing error: {}", e); - } - }); - - // Handle incoming messages (mostly to keep connection alive or handle close) - let mut read_task = tokio::spawn(async move { - while let Some(msg) = read.next().await { - match msg { - Ok(m) => { - if m.is_close() { + if let Err(e) = write.send(Message::Binary(payload.into())).await { + // Client likely disconnected + warn!("Failed to send to {}: {}", peer_addr, e); break; } } + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + warn!("Client {} lagged, skipped {} frames", peer_addr, skipped); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } + } + } + }); + + // Task to read messages (keep-alive / close) + let mut recv_task = tokio::spawn(async move { + while let Some(msg) = read.next().await { + match msg { + Ok(Message::Close(_)) => break, + Ok(_) => {}, // Ignore other messages for now Err(_) => break, } } }); - // Write loop - let mut write_task = tokio::spawn(async move { - while let Some((data, is_key)) = rx.recv().await { - // Prefix: 0x1 for key frame, 0x0 for delta frame - let mut payload = Vec::with_capacity(data.len() + 1); - payload.push(if is_key { 1 } else { 0 }); - payload.extend_from_slice(&data); - - let msg = Message::Binary(payload.into()); - if let Err(e) = write.send(msg).await { - eprintln!("Error sending message: {}", e); - break; - } - } - // Connection closed or processing done - let _ = write.close().await; - }); - tokio::select! { - _ = (&mut read_task) => { - write_task.abort(); - }, - _ = (&mut write_task) => { - read_task.abort(); - } + _ = (&mut send_task) => {}, + _ = (&mut recv_task) => {}, }; - println!("Connection closed: {}", addr); -} - -fn process_video( - tx: tokio::sync::mpsc::Sender<(Vec, bool)>, -) -> Result<(), Box> { - let options = Options { - fps: 30, - target: None, - show_cursor: true, - show_highlight: true, - excluded_targets: None, - output_type: FrameType::YUVFrame, - output_resolution: scap::capturer::Resolution::_1080p, - crop_area: Some(Area { - origin: Point { x: 0.0, y: 0.0 }, - size: scap::capturer::Size { - width: 1920.0, - height: 1080.0, - }, - }), - ..Default::default() - }; - - let mut config: vpx_rs::EncoderConfig = vpx_rs::EncoderConfig::new( - CodecId::VP9, - 1920, - 1080, - Timebase { - num: NonZero::new(1).unwrap(), - den: NonZero::new(30).unwrap(), - }, - RateControl::ConstantBitRate(500), - )?; - config.threads = 32; - config.lag_in_frames = 0; - - let mut encoder = vpx_rs::Encoder::new(config)?; - - let mut capturer = Capturer::build(options)?; - capturer.start_capture(); - - let (frame_tx, frame_rx) = crossbeam::channel::bounded::>(1); - - thread::spawn(move || { - loop { - let instant = Instant::now(); - if let Ok(captured_frame) = capturer.get_next_frame() { - let output_frame = pixelutil::frame_to_yuv(captured_frame); - - let _ = frame_tx.try_send(output_frame); - - let elapsed = instant.elapsed(); - let frame_duration = std::time::Duration::from_millis(33); // ~30 FPS - if elapsed < frame_duration { - std::thread::sleep(frame_duration - elapsed); - } - } - } - }); - - loop { - let yuv_frame_raw = frame_rx.recv()?; - let yuv_frame = pixelutil::apply_frame(&yuv_frame_raw, 1920, 1080); - - let pts = 0; - let duration = 1; - let deadline = vpx_rs::EncodingDeadline::Realtime; - let flags = vpx_rs::EncoderFrameFlags::empty(); - - let packets = encoder.encode(pts, duration, yuv_frame, deadline, flags)?; - - for packet in packets { - match packet { - vpx_rs::Packet::CompressedFrame(frame) => { - let is_key = frame.flags.is_key; - let data = frame.data.to_vec(); - println!("encoded frame: size={}, is_key={}", data.len(), is_key); - if let Err(e) = tx.blocking_send((data, is_key)) { - eprintln!("Error sending encoded frame: {}", e); - return Err(Box::new(e)); - } - } - _ => {} - } - } - } + info!("Connection closed: {}", peer_addr); + send_task.abort(); + recv_task.abort(); + + Ok(()) } diff --git a/backend/src/pixelutil.rs b/backend/src/pixelutil.rs index 67025f6..53aa920 100644 --- a/backend/src/pixelutil.rs +++ b/backend/src/pixelutil.rs @@ -1,13 +1,7 @@ use std::time::Instant; use scap::frame::Frame as ScapFrame; -use vpx_rs::{ - ImageFormat, YUVImageData, YUVImageDataOwned, - image::{ - UVImagePlanes, UVImagePlanesInterleaved, UVImagePlanesInterleavedMut, UVImagePlanesMut, - }, -}; -use yuv::{YuvBiPlanarImageMut, YuvPlanarImageMut}; +use vpx_rs::{ImageFormat, YUVImageData}; pub fn frame_to_yuv<'a>(captured_frame: ScapFrame) -> Vec { match captured_frame { @@ -20,9 +14,9 @@ pub fn frame_to_yuv<'a>(captured_frame: ScapFrame) -> Vec { let mut buf = Vec::new(); //println!("Converting BGRx frame: width={}, height={}", width, height); - let start = Instant::now(); + let _start = Instant::now(); - let r = yuv::bgra_to_yuv420( + let _r = yuv::bgra_to_yuv420( &mut wrap_yuv420_buf(width as usize, height as usize, &mut buf), &bgrx_image.data, width * 4, diff --git a/backend/src/probe.rs b/backend/src/probe.rs deleted file mode 100644 index 1aa331e..0000000 --- a/backend/src/probe.rs +++ /dev/null @@ -1,7 +0,0 @@ -use yuvutils_rs::{YuvBiPlanarImageMut, rgb_to_yuv_nv12, YuvRange, YuvStandardMatrix, YuvConversionMode}; - -fn main() { - let _ = YuvRange::Limited; - let _ = YuvStandardMatrix::Bt709; - // Let's guess simple names first, compiler will correct me -}