use ffmpeg_next as ffmpeg; use futures_util::{SinkExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::accept_async; use tokio_tungstenite::tungstenite::Message; #[tokio::main] async fn main() -> Result<(), Box> { ffmpeg::init()?; let addr = "0.0.0.0:8080"; let listener = TcpListener::bind(&addr).await?; println!("Listening on: {}", addr); while let Ok((stream, _)) = listener.accept().await { tokio::spawn(accept_connection(stream)); } Ok(()) } async fn accept_connection(stream: TcpStream) { let addr = stream .peer_addr() .expect("connected streams should have a peer addr"); println!("Peer address: {}", addr); let ws_stream = match accept_async(stream).await { Ok(ws) => ws, Err(e) => { eprintln!("Error during the websocket handshake occurred: {}", e); return; } }; println!("New WebSocket connection: {}", addr); let (mut write, mut read) = ws_stream.split(); // Channel to communicate between the ffmpeg thread and the websocket task // We send (data, is_key) let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec, bool)>(100); // Spawn a blocking thread for FFmpeg processing std::thread::spawn(move || { if let Err(e) = process_video(tx) { eprintln!("FFmpeg processing error: {}", e); } }); // Handle incoming messages (mostly to keep connection alive or handle close) let mut read_task = tokio::spawn(async move { while let Some(msg) = read.next().await { match msg { Ok(m) => { if m.is_close() { break; } } Err(_) => break, } } }); // Write loop let mut write_task = tokio::spawn(async move { while let Some((data, is_key)) = rx.recv().await { // Prefix: 0x1 for key frame, 0x0 for delta frame let mut payload = Vec::with_capacity(data.len() + 1); payload.push(if is_key { 1 } else { 0 }); payload.extend_from_slice(&data); let msg = Message::Binary(payload.into()); if let Err(e) = write.send(msg).await { eprintln!("Error sending message: {}", e); break; } } // Connection closed or processing done let _ = write.close().await; }); tokio::select! { _ = (&mut read_task) => { write_task.abort(); }, _ = (&mut write_task) => { read_task.abort(); } }; println!("Connection closed: {}", addr); } fn process_video( tx: tokio::sync::mpsc::Sender<(Vec, bool)>, ) -> Result<(), Box> { let input_file = "video.mp4"; let mut ictx = ffmpeg::format::input(&input_file)?; let input_stream = ictx .streams() .best(ffmpeg::media::Type::Video) .ok_or(ffmpeg::Error::StreamNotFound)?; let input_stream_index = input_stream.index(); let decoder_ctx = ffmpeg::codec::context::Context::from_parameters(input_stream.parameters())?; let mut decoder = decoder_ctx.decoder().video()?; // Setup Encoder // Try to find libsvtav1 or fallback to AV1 generic let codec = ffmpeg::codec::encoder::find_by_name("libsvtav1") .or_else(|| ffmpeg::codec::encoder::find(ffmpeg::codec::Id::AV1)) .ok_or(ffmpeg::Error::EncoderNotFound)?; let output_ctx = ffmpeg::codec::context::Context::new(); let mut encoder_builder = output_ctx.encoder().video()?; // We will scale to YUV420P because it's widely supported and good for streaming encoder_builder.set_format(ffmpeg::format::Pixel::YUV420P); encoder_builder.set_width(decoder.width()); encoder_builder.set_height(decoder.height()); encoder_builder.set_time_base(input_stream.time_base()); encoder_builder.set_frame_rate(Some(input_stream.rate())); let mut encoder = encoder_builder.open_as(codec)?; // Scaler to convert whatever input to YUV420P let mut scaler = ffmpeg::software::scaling::context::Context::get( decoder.format(), decoder.width(), decoder.height(), ffmpeg::format::Pixel::YUV420P, decoder.width(), decoder.height(), ffmpeg::software::scaling::flag::Flags::BILINEAR, )?; // Send packet function closure not easy due to ownership, doing inline for (stream, packet) in ictx.packets() { if stream.index() == input_stream_index { decoder.send_packet(&packet)?; let mut decoded = ffmpeg::util::frame::Video::empty(); while decoder.receive_frame(&mut decoded).is_ok() { // Scale frame let mut scaled = ffmpeg::util::frame::Video::empty(); scaler.run(&decoded, &mut scaled)?; // Set pts for the scaled frame to match decoded scaled.set_pts(decoded.pts()); // Send to encoder encoder.send_frame(&scaled)?; // Receive encoded packets let mut encoded = ffmpeg::Packet::empty(); while encoder.receive_packet(&mut encoded).is_ok() { let is_key = encoded.is_key(); let data = encoded.data().ok_or("Empty packet data")?.to_vec(); // Blocking send to the tokio channel if tx.blocking_send((data, is_key)).is_err() { return Ok(()); // Receiver dropped } } } } } // Flush encoder encoder.send_eof()?; let mut encoded = ffmpeg::Packet::empty(); while encoder.receive_packet(&mut encoded).is_ok() { let is_key = encoded.is_key(); let data = encoded.data().ok_or("Empty packet data")?.to_vec(); if tx.blocking_send((data, is_key)).is_err() { return Ok(()); } } Ok(()) }