From 4e515ba964321dcf44949e44c0831b748f2ddb26 Mon Sep 17 00:00:00 2001 From: minco Date: Wed, 13 May 2026 00:41:26 +0900 Subject: [PATCH] add watch --- Cargo.lock | 4 + crates/rsh-backend/src/auth.rs | 33 ++++++++ crates/rsh-backend/src/dist.rs | 15 +--- crates/rsh-backend/src/main.rs | 1 + crates/rsh/Cargo.toml | 4 + crates/rsh/src/config.rs | 36 +++++++++ crates/rsh/src/main.rs | 133 ++++++++++++++++++++++++++++---- crates/rsh/src/ws.rs | 11 ++- crates/rshc/src/auth.rs | 5 +- crates/rshc/src/cmd/connect.rs | 13 ++-- crates/rshc/src/cmd/watch.rs | 135 +++++++++++++++++++++++++++------ crates/rshc/src/config.rs | 5 +- crates/rshc/src/main.rs | 5 +- 13 files changed, 336 insertions(+), 64 deletions(-) create mode 100644 crates/rsh/src/config.rs diff --git a/Cargo.lock b/Cargo.lock index b0ad5eb..f664643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2155,13 +2155,17 @@ dependencies = [ "anyhow", "bytes", "clap", + "dirs 5.0.1", "futures-util", "hostname", "portable-pty", + "reqwest", + "rpassword", "rsh-types", "rustls", "serde", "serde_json", + "serde_yaml", "tokio", "tokio-tungstenite", "tracing", diff --git a/crates/rsh-backend/src/auth.rs b/crates/rsh-backend/src/auth.rs index 5c83d88..24cff75 100644 --- a/crates/rsh-backend/src/auth.rs +++ b/crates/rsh-backend/src/auth.rs @@ -1,8 +1,13 @@ +use crate::state::AppState; use anyhow::{anyhow, Result}; use argon2::password_hash::{PasswordHash, PasswordVerifier}; use argon2::Argon2; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use serde::Deserialize; use ssh_key::public::PublicKey; use ssh_key::SshSig; +use std::sync::Arc; pub fn verify_password(password: &str, hash: &str) -> bool { let parsed = match PasswordHash::new(hash) { @@ -27,3 +32,31 @@ pub fn find_key<'a>(keys: &'a [PublicKey], offered: &PublicKey) -> Option<&'a Pu let fp = offered.fingerprint(Default::default()); keys.iter().find(|k| k.fingerprint(Default::default()) == fp) } + +#[derive(Deserialize)] +pub struct CheckAuthQuery { + s: String, + pw: Option, +} + +pub async fn check_auth_handler( + State(state): State>, + Query(q): Query, +) -> StatusCode { + let sessions = state.sessions.read().await; + let session = match sessions.get(&q.s) { + Some(s) => s.clone(), + None => return StatusCode::NOT_FOUND, + }; + drop(sessions); + match &session.password_hash { + Some(hash) => { + if verify_password(&q.pw.unwrap_or_default(), hash) { + StatusCode::OK + } else { + StatusCode::UNAUTHORIZED + } + } + None => StatusCode::OK, + } +} diff --git a/crates/rsh-backend/src/dist.rs b/crates/rsh-backend/src/dist.rs index dd0adaf..cf8be1a 100644 --- a/crates/rsh-backend/src/dist.rs +++ b/crates/rsh-backend/src/dist.rs @@ -28,22 +28,11 @@ else fi chmod +x "$TMP" -printf 'Password (empty for none): ' >&2 -stty -echo /dev/null || true - -# Force read to use the actual terminal device -IFS= read -r PW < /dev/tty || PW='' - -stty echo /dev/null || true -echo >&2 +"$TMP" auth --url "$WS" "$SESSION" trap - EXIT LOG="/tmp/rsh-${SESSION}.log" -if [ -n "$PW" ]; then - nohup "$TMP" --url "$WS/ws/stub" --session "$SESSION" --password "$PW" >"$LOG" 2>&1 & -else - nohup "$TMP" --url "$WS/ws/stub" --session "$SESSION" >"$LOG" 2>&1 & -fi +nohup "$TMP" stub --url "$WS/ws/stub" --session "$SESSION" >"$LOG" 2>&1 & disown 2>/dev/null || true printf 'rsh: stub running in background (pid %s, log: %s)\n' "$!" "$LOG" >&2 "#; diff --git a/crates/rsh-backend/src/main.rs b/crates/rsh-backend/src/main.rs index e3ac1e8..cc92478 100644 --- a/crates/rsh-backend/src/main.rs +++ b/crates/rsh-backend/src/main.rs @@ -53,6 +53,7 @@ async fn main() -> anyhow::Result<()> { .route("/run.sh", get(dist::run_sh)) .route("/rsh/:arch", get(dist::stub)) .route("/healthz", get(|| async { "ok" })) + .route("/check-auth", get(auth::check_auth_handler)) .route("/ws/stub", get(ws_stub::handler)) .route("/ws/op", get(ws_op::handler)) .with_state(state.clone()) diff --git a/crates/rsh/Cargo.toml b/crates/rsh/Cargo.toml index 67f8808..91c79b5 100644 --- a/crates/rsh/Cargo.toml +++ b/crates/rsh/Cargo.toml @@ -19,4 +19,8 @@ tracing-subscriber = { workspace = true } hostname = { workspace = true } whoami = { workspace = true } rustls.workspace = true +serde_yaml.workspace = true +rpassword.workspace = true +dirs.workspace = true +reqwest.workspace = true diff --git a/crates/rsh/src/config.rs b/crates/rsh/src/config.rs new file mode 100644 index 0000000..1e4a0b8 --- /dev/null +++ b/crates/rsh/src/config.rs @@ -0,0 +1,36 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct RshConfig { + #[serde(default)] + pub sessions: HashMap, +} + +impl RshConfig { + pub fn path() -> PathBuf { + dirs::config_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join("rsh.yaml") + } + + pub fn load() -> Self { + let p = Self::path(); + if !p.exists() { + return Self::default(); + } + let raw = std::fs::read_to_string(&p).unwrap_or_default(); + serde_yaml::from_str(&raw).unwrap_or_default() + } + + pub fn save(&self) -> Result<()> { + let p = Self::path(); + if let Some(parent) = p.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(&p, serde_yaml::to_string(self)?)?; + Ok(()) + } +} diff --git a/crates/rsh/src/main.rs b/crates/rsh/src/main.rs index a3fd6ff..bc62daf 100644 --- a/crates/rsh/src/main.rs +++ b/crates/rsh/src/main.rs @@ -1,24 +1,41 @@ +mod config; mod pty; mod ws; -use anyhow::Result; -use clap::Parser; +use anyhow::{bail, Result}; +use clap::{Parser, Subcommand}; +use config::RshConfig; use std::time::Duration; use tracing_subscriber::EnvFilter; -#[derive(Parser, Debug, Clone)] -#[command(version, about = "rsh reverse-shell stub")] -struct Args { - #[arg(long)] - url: String, - #[arg(long)] - session: String, - #[arg(long)] - password: Option, - #[arg(long)] - shell: Option, - #[arg(long, default_value_t = false)] - no_pty: bool, +#[derive(Parser, Debug)] +#[command(name = "rsh", version, about = "rsh reverse-shell stub")] +struct Cli { + #[command(subcommand)] + cmd: Cmd, +} + +#[derive(Subcommand, Debug)] +enum Cmd { + /// Verify session password against backend and store it in ~/.config/rsh.yaml + Auth { + #[arg(long)] + url: String, + session: String, + }, + /// Run as a reverse-shell stub (background daemon) + Stub { + #[arg(long)] + url: String, + #[arg(long)] + session: String, + #[arg(long)] + password: Option, + #[arg(long)] + shell: Option, + #[arg(long, default_value_t = false)] + no_pty: bool, + }, } #[tokio::main] @@ -34,7 +51,91 @@ async fn main() -> Result<()> { .install_default() .expect("failed to install ring provider"); - let args = Args::parse(); + let cli = Cli::parse(); + + match cli.cmd { + Cmd::Auth { url, session } => run_auth(url, session).await, + Cmd::Stub { url, session, password, shell, no_pty } => { + run_stub(url, session, password, shell, no_pty).await + } + } +} + +fn ws_to_http(url: &str) -> String { + if let Some(rest) = url.strip_prefix("wss://") { + format!("https://{rest}") + } else if let Some(rest) = url.strip_prefix("ws://") { + format!("http://{rest}") + } else { + url.to_string() + } +} + +async fn check_password(http_base: &str, session: &str, pw: &str) -> Result { + let url = format!("{http_base}/check-auth"); + let status = reqwest::Client::new() + .get(&url) + .query(&[("s", session), ("pw", pw)]) + .send() + .await? + .status(); + Ok(status) +} + +async fn run_auth(url: String, session: String) -> Result<()> { + let http_base = ws_to_http(&url); + + // If we have a cached password, verify it silently + let mut cfg = RshConfig::load(); + if let Some(cached_pw) = cfg.sessions.get(&session).cloned() { + match check_password(&http_base, &session, &cached_pw).await? { + s if s == reqwest::StatusCode::OK => { + eprintln!("rsh: already authenticated for session '{session}'"); + return Ok(()); + } + s if s == reqwest::StatusCode::NOT_FOUND => { + bail!("rsh: session '{session}' not found"); + } + _ => { + // Stale cached password — fall through to prompt + } + } + } + + eprint!("Password for session '{session}' (empty for none): "); + let pw = rpassword::read_password()?; + + if pw.is_empty() { + cfg.sessions.remove(&session); + cfg.save()?; + return Ok(()); + } + + match check_password(&http_base, &session, &pw).await? { + s if s == reqwest::StatusCode::OK => { + cfg.sessions.insert(session, pw); + cfg.save()?; + } + s if s == reqwest::StatusCode::NOT_FOUND => { + bail!("rsh: session '{session}' not found"); + } + _ => { + bail!("rsh: wrong password for session '{session}'"); + } + } + Ok(()) +} + +async fn run_stub( + url: String, + session: String, + password: Option, + shell: Option, + no_pty: bool, +) -> Result<()> { + let password = password.or_else(|| RshConfig::load().sessions.remove(&session)); + + let args = ws::StubArgs { url, session, password, shell, no_pty }; let mut backoff = Duration::from_secs(1); loop { match ws::run(&args).await { diff --git a/crates/rsh/src/ws.rs b/crates/rsh/src/ws.rs index 3d18872..61c9b10 100644 --- a/crates/rsh/src/ws.rs +++ b/crates/rsh/src/ws.rs @@ -1,5 +1,4 @@ use crate::pty; -use crate::Args; use anyhow::{anyhow, Result}; use futures_util::{SinkExt, StreamExt}; use rsh_types::{BackendStubMsg, StubInfo, StubMsg}; @@ -23,7 +22,15 @@ struct ExtraShell { kill_tx: tokio::sync::oneshot::Sender<()>, } -pub async fn run(args: &Args) -> Result { +pub struct StubArgs { + pub url: String, + pub session: String, + pub password: Option, + pub shell: Option, + pub no_pty: bool, +} + +pub async fn run(args: &StubArgs) -> Result { let (mut ws, _) = tokio_tungstenite::connect_async(&args.url).await?; let info = StubInfo { hostname: hostname::get().ok().and_then(|h| h.into_string().ok()).unwrap_or_default(), diff --git a/crates/rshc/src/auth.rs b/crates/rshc/src/auth.rs index b3d3a14..7f6dbcc 100644 --- a/crates/rshc/src/auth.rs +++ b/crates/rshc/src/auth.rs @@ -127,6 +127,9 @@ impl AuthedClient { _ => {} } } + // Signal all pending callers that the connection is gone. + pending_r.lock().await.clear(); + *events_tx_r.lock().await = None; }); Ok(Self { @@ -147,7 +150,7 @@ impl AuthedClient { .send(OpMsg::Req { id, body }) .await .map_err(|_| anyhow!("send failed (connection closed)"))?; - rx.await.map_err(|_| anyhow!("response dropped")) + rx.await.map_err(|_| anyhow!("backend disconnected")) } pub async fn req_stream(&self, body: OpReq) -> Result<(u64, mpsc::Receiver)> { diff --git a/crates/rshc/src/cmd/connect.rs b/crates/rshc/src/cmd/connect.rs index 34aec0d..104cbbe 100644 --- a/crates/rshc/src/cmd/connect.rs +++ b/crates/rshc/src/cmd/connect.rs @@ -114,24 +114,25 @@ pub async fn pump( } let _ = client.send_attach_io(AttachIOFrame::Stdin(buf[..n].to_vec())).await; } - Some(resp) = resps.recv() => { + resp = resps.recv() => { match resp { - OpResp::Stdout(b) => { + None => return Err(anyhow!("backend disconnected")), + Some(OpResp::Stdout(b)) => { let mut out = std::io::stdout(); out.write_all(&b).ok(); out.flush().ok(); } - OpResp::Stderr(b) => { + Some(OpResp::Stderr(b)) => { let mut err = std::io::stderr(); err.write_all(&b).ok(); err.flush().ok(); } - OpResp::Exited { code } => { + Some(OpResp::Exited { code }) => { ui::print_info(&format!("remote exited (code {:?})", code)); return Ok(()); } - OpResp::Err(e) => return Err(anyhow!(e)), - _ => {} + Some(OpResp::Err(e)) => return Err(anyhow!(e)), + Some(_) => {} } } Some((cols, rows)) = async { diff --git a/crates/rshc/src/cmd/watch.rs b/crates/rshc/src/cmd/watch.rs index 8ed3dd7..87d5d28 100644 --- a/crates/rshc/src/cmd/watch.rs +++ b/crates/rshc/src/cmd/watch.rs @@ -1,39 +1,128 @@ use crate::auth::AuthedClient; +use crate::config::Config; use crate::ui; use anyhow::{anyhow, Result}; +use std::time::Duration; +use crossterm::{cursor::MoveTo, execute, terminal::{Clear, ClearType}}; use owo_colors::OwoColorize; -use rsh_types::{OpEvent, OpReq, OpResp}; +use rsh_types::{ConnectionView, OpEvent, OpReq, OpResp, SessionView}; +use std::io::stdout; pub async fn run(client: &AuthedClient, session: Option) -> Result<()> { let mut events = client.take_events().await; + match client.req(OpReq::Watch { session: session.clone() }).await? { OpResp::WatchStarted => {} OpResp::Err(e) => return Err(anyhow!(e)), other => return Err(anyhow!("unexpected: {other:?}")), } - ui::print_info(&format!( - "watching {}", - session.as_deref().unwrap_or("all sessions") - )); - while let Some(ev) = events.recv().await { - match ev { - OpEvent::NewConnection(c) => println!( - "{} {} #{} {}@{}", - "+conn".green().bold(), - c.session_id, - c.connection_id, - c.info.user, - c.info.hostname - ), - OpEvent::ConnectionClosed { session, connection_id } => println!( - "{} {} #{}", - "-conn".red().bold(), - session, - connection_id - ), - OpEvent::NewSession(s) => println!("{} {}", "+sess".cyan().bold(), s.id), - OpEvent::SessionDeleted { session } => println!("{} {}", "-sess".magenta().bold(), session), + + let mut sessions: Vec = match client.req(OpReq::SessionList).await? { + OpResp::Sessions(s) => s, + OpResp::Err(e) => return Err(anyhow!(e)), + other => return Err(anyhow!("unexpected: {other:?}")), + }; + let mut connections: Vec = + match client.req(OpReq::ConnectionList { session: session.clone() }).await? { + OpResp::Connections(c) => c, + OpResp::Err(e) => return Err(anyhow!(e)), + other => return Err(anyhow!("unexpected: {other:?}")), + }; + + if let Some(ref filter) = session { + sessions.retain(|s| &s.id == filter); + } + sessions.sort_by(|a, b| a.id.cmp(&b.id)); + connections.sort_by(|a, b| { + a.session_id.cmp(&b.session_id).then(a.connection_id.cmp(&b.connection_id)) + }); + + render(&sessions, &connections)?; + + loop { + match events.recv().await { + None => return Err(anyhow!("backend disconnected")), + Some(ev) => { + match ev { + OpEvent::NewConnection(c) => { + if let Some(s) = sessions.iter_mut().find(|s| s.id == c.session_id) { + s.connection_count += 1; + } + connections.push(c); + connections.sort_by(|a, b| { + a.session_id.cmp(&b.session_id).then(a.connection_id.cmp(&b.connection_id)) + }); + } + OpEvent::ConnectionClosed { session: sess, connection_id } => { + connections.retain(|c| !(c.session_id == sess && c.connection_id == connection_id)); + if let Some(s) = sessions.iter_mut().find(|s| s.id == sess) { + s.connection_count = s.connection_count.saturating_sub(1); + } + } + OpEvent::NewSession(s) => { + if session.is_none() || session.as_deref() == Some(&s.id) { + sessions.push(s); + sessions.sort_by(|a, b| a.id.cmp(&b.id)); + } + } + OpEvent::SessionDeleted { session: sess } => { + sessions.retain(|s| s.id != sess); + connections.retain(|c| c.session_id != sess); + } + } + render(&sessions, &connections)?; + } } } +} + +pub async fn run_with_reconnect(cfg: &Config, session: Option) -> Result<()> { + let mut backoff = Duration::from_secs(1); + loop { + match AuthedClient::connect(cfg).await { + Err(e) => { + eprintln!("\nconnect failed: {e}"); + } + Ok(client) => { + backoff = Duration::from_secs(1); + match run(&client, session.clone()).await { + Ok(()) => return Ok(()), + Err(e) => { + eprintln!("\n{e}"); + } + } + } + } + eprintln!("reconnecting in {}s...", backoff.as_secs()); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(Duration::from_secs(30)); + } +} + +fn render(sessions: &[SessionView], connections: &[ConnectionView]) -> Result<()> { + execute!(stdout(), Clear(ClearType::All), MoveTo(0, 0))?; + + println!( + "{} {} sessions {} connections {}", + "●".green().bold(), + sessions.len().bold(), + connections.len().bold(), + "Ctrl-C to exit".dimmed(), + ); + println!(); + + if sessions.is_empty() { + println!("{}", " no sessions".dimmed()); + } else { + println!("{}", ui::sessions_table(sessions)); + } + println!(); + + if connections.is_empty() { + println!("{}", " no connections".dimmed()); + } else { + println!("{}", ui::connections_table(connections)); + } + Ok(()) } diff --git a/crates/rshc/src/config.rs b/crates/rshc/src/config.rs index 978b8bf..096ccc6 100644 --- a/crates/rshc/src/config.rs +++ b/crates/rshc/src/config.rs @@ -14,7 +14,7 @@ impl Config { return PathBuf::from(p); } let base = dirs::config_dir().unwrap_or_else(|| PathBuf::from(".")); - base.join("rsh.yaml") + base.join("rshc.yaml") } pub fn load() -> Result { @@ -35,7 +35,8 @@ impl Config { )); } let raw = std::fs::read_to_string(&p).with_context(|| format!("read {}", p.display()))?; - let cfg: Config = serde_yaml::from_str(&raw).with_context(|| format!("parse {}", p.display()))?; + let cfg: Config = + serde_yaml::from_str(&raw).with_context(|| format!("parse {}", p.display()))?; Ok(cfg) } diff --git a/crates/rshc/src/main.rs b/crates/rshc/src/main.rs index 8b82749..6a4b74f 100644 --- a/crates/rshc/src/main.rs +++ b/crates/rshc/src/main.rs @@ -139,9 +139,12 @@ async fn main() { async fn run() -> Result<()> { let cli = Cli::parse(); let cfg = Config::load()?; + if let Cmd::Watch(a) = cli.cmd { + return cmd::watch::run_with_reconnect(&cfg, a.session).await; + } let client = AuthedClient::connect(&cfg).await?; match cli.cmd { - Cmd::Watch(a) => cmd::watch::run(&client, a.session).await, + Cmd::Watch(_) => unreachable!(), Cmd::Session(s) => match s.sub { SessionSub::Create { name } => cmd::session::create(&client, name).await, SessionSub::Delete { name, yes, disconnect } => cmd::session::delete(&client, name, yes, disconnect).await,