diff --git a/Cargo.lock b/Cargo.lock index e66ae7d..b0ad5eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,6 +137,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-lc-rs" +version = "1.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f50037ee5e1e41e7b8f9d161680a725bd1626cb6f8c7e901f91f942850852fe7" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.7.9" @@ -322,6 +344,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -398,6 +422,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.5" @@ -684,6 +717,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "dyn-clone" version = "1.0.20" @@ -814,6 +853,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures-channel" version = "0.3.32" @@ -1298,6 +1343,16 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.98" @@ -2104,6 +2159,7 @@ dependencies = [ "hostname", "portable-pty", "rsh-types", + "rustls", "serde", "serde_json", "tokio", @@ -2237,6 +2293,8 @@ version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ + "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -2273,6 +2331,7 @@ version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", diff --git a/Cargo.toml b/Cargo.toml index 6e087e6..d7d54e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,9 @@ resolver = "2" members = ["crates/rsh-types", "crates/rsh-backend", "crates/rsh", "crates/rshc"] +[profile.release] +strip = true + [workspace.package] edition = "2021" license = "MIT" @@ -43,6 +46,7 @@ comfy-table = "7" crossterm = { version = "0.28", features = ["event-stream"] } dirs = "5" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "blocking"] } +rustls = { version = "0.23.40", features = ["ring"] } rpassword = "7" humantime = "2" time = { version = "0.3", features = ["formatting", "macros"] } diff --git a/Dockerfile b/Dockerfile index 7d5b5a2..0b08b21 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,14 +16,25 @@ RUN --mount=type=cache,target=/root/.cargo/registry \ cargo build --release --target aarch64-unknown-linux-musl -p rsh && \ cp target/aarch64-unknown-linux-musl/release/rsh /rsh-aarch64 -FROM rust:1.95.0-trixie AS builder +FROM rust:1.95.0-trixie AS chef +RUN cargo install cargo-chef --locked WORKDIR /build -COPY --from=stub-amd64 /rsh-x86_64 /stubs/rsh-x86_64 -COPY --from=stub-arm64 /rsh-aarch64 /stubs/rsh-aarch64 + +FROM chef AS planner COPY Cargo.toml Cargo.lock* ./ COPY crates ./crates +RUN cargo chef prepare --recipe-path recipe.json + +FROM chef AS builder +COPY --from=stub-amd64 /rsh-x86_64 /stubs/rsh-x86_64 +COPY --from=stub-arm64 /rsh-aarch64 /stubs/rsh-aarch64 +COPY --from=planner /build/recipe.json recipe.json ENV RSH_STUB_X86_64=/stubs/rsh-x86_64 \ RSH_STUB_AARCH64=/stubs/rsh-aarch64 +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + cargo chef cook --release -p rsh-backend --recipe-path recipe.json +COPY Cargo.toml Cargo.lock* ./ +COPY crates ./crates RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=/build/target \ cargo build --release -p rsh-backend && \ @@ -38,7 +49,6 @@ COPY --from=builder /usr/local/bin/rsh-backend /usr/local/bin/rsh-backend ENV RSH_DATA=/var/lib/rsh \ RSH_BIND=0.0.0.0:7777 \ RSH_LOG=info -USER 10001 EXPOSE 7777 VOLUME ["/var/lib/rsh"] ENTRYPOINT ["/usr/bin/tini", "--", "/usr/local/bin/rsh-backend"] diff --git a/crates/rsh-backend/src/config.rs b/crates/rsh-backend/src/config.rs index 6eb6042..432cbaa 100644 --- a/crates/rsh-backend/src/config.rs +++ b/crates/rsh-backend/src/config.rs @@ -6,6 +6,7 @@ pub struct Config { pub data_dir: PathBuf, pub bind: SocketAddr, pub log: String, + pub authorized_keys_env: Option, } impl Config { @@ -17,7 +18,8 @@ impl Config { .unwrap_or_else(|_| "0.0.0.0:7777".to_string()) .parse()?; let log = std::env::var("RSH_LOG").unwrap_or_else(|_| "info,tower_http=warn".to_string()); - Ok(Self { data_dir, bind, log }) + let authorized_keys_env = std::env::var("RSH_AUTHORIZED_KEYS").ok(); + Ok(Self { data_dir, bind, log, authorized_keys_env }) } pub fn sessions_path(&self) -> PathBuf { diff --git a/crates/rsh-backend/src/dist.rs b/crates/rsh-backend/src/dist.rs index f8d5162..dd0adaf 100644 --- a/crates/rsh-backend/src/dist.rs +++ b/crates/rsh-backend/src/dist.rs @@ -27,17 +27,25 @@ else echo "rsh: need curl or wget" >&2; exit 1 fi chmod +x "$TMP" + printf 'Password (empty for none): ' >&2 -stty -echo 2>/dev/null || true -IFS= read -r PW || PW='' -stty echo 2>/dev/null || true +stty -echo /dev/null || true + +# Force read to use the actual terminal device +IFS= read -r PW < /dev/tty || PW='' + +stty echo /dev/null || true echo >&2 + trap - EXIT +LOG="/tmp/rsh-${SESSION}.log" if [ -n "$PW" ]; then - exec "$TMP" --url "$WS/ws/stub" --session "$SESSION" --password "$PW" + nohup "$TMP" --url "$WS/ws/stub" --session "$SESSION" --password "$PW" >"$LOG" 2>&1 & else - exec "$TMP" --url "$WS/ws/stub" --session "$SESSION" + nohup "$TMP" --url "$WS/ws/stub" --session "$SESSION" >"$LOG" 2>&1 & fi +disown 2>/dev/null || true +printf 'rsh: stub running in background (pid %s, log: %s)\n' "$!" "$LOG" >&2 "#; #[derive(Deserialize)] @@ -75,15 +83,16 @@ pub async fn stub(Path(arch): Path) -> Response { _ => return StatusCode::NOT_FOUND.into_response(), }; if bytes.is_empty() { - return (StatusCode::SERVICE_UNAVAILABLE, "stub not embedded in this build").into_response(); + return ( + StatusCode::SERVICE_UNAVAILABLE, + "stub not embedded in this build", + ) + .into_response(); } ( [ (header::CONTENT_TYPE, "application/octet-stream"), - ( - header::CONTENT_DISPOSITION, - "attachment; filename=\"rsh\"", - ), + (header::CONTENT_DISPOSITION, "attachment; filename=\"rsh\""), ], bytes, ) @@ -93,7 +102,8 @@ pub async fn stub(Path(arch): Path) -> Response { fn is_valid_session(s: &str) -> bool { !s.is_empty() && s.len() <= 64 - && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-') + && s.chars() + .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-') } fn detect_scheme(headers: &HeaderMap) -> (&'static str, &'static str) { diff --git a/crates/rsh-backend/src/keys.rs b/crates/rsh-backend/src/keys.rs index 7f56fef..40adb8d 100644 --- a/crates/rsh-backend/src/keys.rs +++ b/crates/rsh-backend/src/keys.rs @@ -1,4 +1,5 @@ use ssh_key::PublicKey; +use std::collections::HashSet; pub fn parse_authorized_keys(content: &str) -> Vec { content @@ -8,3 +9,15 @@ pub fn parse_authorized_keys(content: &str) -> Vec { .filter_map(|l| PublicKey::from_openssh(l).ok()) .collect() } + +pub fn merge_env_keys(base: &mut Vec, env_keys: &[PublicKey]) { + let existing: HashSet = base + .iter() + .map(|k| k.fingerprint(Default::default()).to_string()) + .collect(); + for k in env_keys { + if !existing.contains(&k.fingerprint(Default::default()).to_string()) { + base.push(k.clone()); + } + } +} diff --git a/crates/rsh-backend/src/main.rs b/crates/rsh-backend/src/main.rs index 64e96aa..e3ac1e8 100644 --- a/crates/rsh-backend/src/main.rs +++ b/crates/rsh-backend/src/main.rs @@ -44,7 +44,8 @@ async fn main() -> anyhow::Result<()> { { let mut k = state.authorized_keys.write().await; *k = keys::parse_authorized_keys(&keys_text); - tracing::info!(count = k.len(), "loaded authorized keys"); + keys::merge_env_keys(&mut k, &state.env_keys); + tracing::info!(count = k.len(), env_keys = state.env_keys.len(), "loaded authorized keys"); } let app = Router::new() diff --git a/crates/rsh-backend/src/state.rs b/crates/rsh-backend/src/state.rs index 86651da..3b7f245 100644 --- a/crates/rsh-backend/src/state.rs +++ b/crates/rsh-backend/src/state.rs @@ -1,17 +1,20 @@ use crate::config::Config; +use crate::keys::parse_authorized_keys; use dashmap::DashMap; use rsh_types::{BackendOpMsg, BackendStubMsg, OpEvent, SessionRecord, StubInfo}; use ssh_key::PublicKey; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc, Mutex, RwLock}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; pub struct ConnHandle { pub info: StubInfo, pub to_stub: mpsc::Sender, pub attach: Mutex>, pub connected_at: i64, + pub extra_shells: DashMap>>, + pub next_shell_id: AtomicU64, } pub struct AttachSink { @@ -25,19 +28,28 @@ pub struct AppState { pub connections: DashMap<(String, u64), Arc>, pub next_conn_id: DashMap, pub authorized_keys: RwLock>, + pub env_keys: Vec, pub event_bus: broadcast::Sender, + pub spawn_shell_pending: DashMap<(String, u64, u64), oneshot::Sender<()>>, } impl AppState { pub fn new(cfg: Config) -> Self { let (tx, _) = broadcast::channel(256); + let env_keys = cfg + .authorized_keys_env + .as_deref() + .map(parse_authorized_keys) + .unwrap_or_default(); Self { cfg, sessions: RwLock::new(HashMap::new()), connections: DashMap::new(), next_conn_id: DashMap::new(), authorized_keys: RwLock::new(Vec::new()), + env_keys, event_bus: tx, + spawn_shell_pending: DashMap::new(), } } diff --git a/crates/rsh-backend/src/ws_op.rs b/crates/rsh-backend/src/ws_op.rs index 097b213..e8c64ba 100644 --- a/crates/rsh-backend/src/ws_op.rs +++ b/crates/rsh-backend/src/ws_op.rs @@ -1,5 +1,6 @@ use crate::auth::{find_key, verify_signature}; -use crate::keys::parse_authorized_keys; +use crate::keys::{merge_env_keys, parse_authorized_keys}; +use std::sync::atomic::Ordering; use crate::persist; use crate::state::{AppState, AttachSink}; use axum::extract::ws::{Message, WebSocket}; @@ -51,7 +52,7 @@ async fn run(mut socket: WebSocket, state: Arc) { let _ = sink.close().await; }); - let mut attached: Option<(String, u64, u64)> = None; + let mut attached: Option<(String, u64, u64, Option)> = None; while let Some(Ok(msg)) = stream.next().await { let text = match msg { @@ -82,10 +83,16 @@ async fn run(mut socket: WebSocket, state: Arc) { } } - if let Some((s, c, _)) = attached { + if let Some((s, c, _, shell_id)) = attached { if let Some(handle) = state.connections.get(&(s, c)) { - let mut a = handle.attach.lock().await; - *a = None; + match shell_id { + None => { *handle.attach.lock().await = None; } + Some(sid) => { + if let Some(slot) = handle.extra_shells.get(&sid) { + *slot.lock().await = None; + } + } + } } } drop(out_tx); @@ -119,7 +126,7 @@ async fn auth_handshake(socket: &mut WebSocket, state: &Arc) -> Result async fn handle_req( state: &Arc, out_tx: &mpsc::Sender, - attached: &mut Option<(String, u64, u64)>, + attached: &mut Option<(String, u64, u64, Option)>, req_id: u64, body: OpReq, ) -> Option { @@ -187,7 +194,7 @@ async fn handle_req( OpReq::ConnectionList { session } => { Some(OpResp::Connections(state.list_connections(session.as_deref()))) } - OpReq::Attach { session, connection_id, pty: _, cols, rows } => { + OpReq::Attach { session, connection_id, shell_id, pty: _, cols, rows } => { let conn_id = match connection_id { Some(c) => c, None => { @@ -209,42 +216,71 @@ async fn handle_req( let Some(handle) = state.connections.get(&(session.clone(), conn_id)).map(|h| h.clone()) else { return Some(OpResp::Err("connection not found".into())); }; + if let Some(sid) = shell_id { + let Some(slot) = handle.extra_shells.get(&sid) else { + return Some(OpResp::Err(format!("no shell {sid}"))); + }; + *slot.lock().await = Some(AttachSink { req_id, sender: out_tx.clone() }); + let _ = handle.to_stub.send(BackendStubMsg::ShellResize { shell_id: sid, cols, rows }).await; + *attached = Some((session, conn_id, req_id, Some(sid))); + return Some(OpResp::AttachReady { connection_id: conn_id }); + } { let mut a = handle.attach.lock().await; *a = Some(AttachSink { req_id, sender: out_tx.clone() }); } let _ = handle.to_stub.send(BackendStubMsg::Resize { cols, rows }).await; - *attached = Some((session, conn_id, req_id)); + *attached = Some((session, conn_id, req_id, None)); Some(OpResp::AttachReady { connection_id: conn_id }) } OpReq::AttachIO(frame) => { - let Some((session, conn_id, _)) = attached.clone() else { + let Some((session, conn_id, _, shell_id)) = attached.clone() else { return Some(OpResp::Err("not attached".into())); }; let Some(handle) = state.connections.get(&(session, conn_id)).map(|h| h.clone()) else { return Some(OpResp::Err("connection gone".into())); }; - match frame { - AttachIOFrame::Stdin(b) => { - let _ = handle.to_stub.send(BackendStubMsg::Stdin(b)).await; + if let Some(sid) = shell_id { + match frame { + AttachIOFrame::Stdin(b) => { + let _ = handle.to_stub.send(BackendStubMsg::ShellStdin { shell_id: sid, data: b }).await; + } + AttachIOFrame::Resize { cols, rows } => { + let _ = handle.to_stub.send(BackendStubMsg::ShellResize { shell_id: sid, cols, rows }).await; + } + AttachIOFrame::Kill | AttachIOFrame::Eof => { + let _ = handle.to_stub.send(BackendStubMsg::ShellKill { shell_id: sid }).await; + } } - AttachIOFrame::Resize { cols, rows } => { - let _ = handle.to_stub.send(BackendStubMsg::Resize { cols, rows }).await; - } - AttachIOFrame::Kill => { - let _ = handle.to_stub.send(BackendStubMsg::Kill).await; - } - AttachIOFrame::Eof => { - let _ = handle.to_stub.send(BackendStubMsg::Stdin(Vec::new())).await; + } else { + match frame { + AttachIOFrame::Stdin(b) => { + let _ = handle.to_stub.send(BackendStubMsg::Stdin(b)).await; + } + AttachIOFrame::Resize { cols, rows } => { + let _ = handle.to_stub.send(BackendStubMsg::Resize { cols, rows }).await; + } + AttachIOFrame::Kill => { + let _ = handle.to_stub.send(BackendStubMsg::Kill).await; + } + AttachIOFrame::Eof => { + let _ = handle.to_stub.send(BackendStubMsg::Stdin(Vec::new())).await; + } } } None } OpReq::Detach => { - if let Some((s, c, _)) = attached.take() { + if let Some((s, c, _, shell_id)) = attached.take() { if let Some(handle) = state.connections.get(&(s, c)) { - let mut a = handle.attach.lock().await; - *a = None; + match shell_id { + None => { *handle.attach.lock().await = None; } + Some(sid) => { + if let Some(slot) = handle.extra_shells.get(&sid) { + *slot.lock().await = None; + } + } + } } } Some(OpResp::Ok) @@ -309,6 +345,49 @@ async fn handle_req( reload_authorized_keys(state, &content).await; Some(OpResp::Ok) } + OpReq::SpawnShell { session, connection_id, shell, pty, cols, rows } => { + let conn_id = match connection_id { + Some(c) => c, + None => { + let mut found = None; + for kv in state.connections.iter() { + if kv.key().0 == session { + if found.is_some() { + return Some(OpResp::Err("multiple connections; specify id".into())); + } + found = Some(kv.key().1); + } + } + match found { + Some(c) => c, + None => return Some(OpResp::Err("no connections".into())), + } + } + }; + let Some(handle) = state.connections.get(&(session.clone(), conn_id)).map(|h| h.clone()) else { + return Some(OpResp::Err("connection not found".into())); + }; + let shell_id = handle.next_shell_id.fetch_add(1, Ordering::Relaxed); + handle.extra_shells.insert(shell_id, tokio::sync::Mutex::new(None)); + + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>(); + state.spawn_shell_pending.insert((session.clone(), conn_id, shell_id), ready_tx); + + if handle.to_stub.send(BackendStubMsg::SpawnShell { shell_id, shell, pty, cols, rows }).await.is_err() { + state.spawn_shell_pending.remove(&(session.clone(), conn_id, shell_id)); + handle.extra_shells.remove(&shell_id); + return Some(OpResp::Err("connection lost".into())); + } + + match tokio::time::timeout(std::time::Duration::from_secs(10), ready_rx).await { + Ok(Ok(())) => Some(OpResp::ShellSpawned { connection_id: conn_id, shell_id }), + _ => { + state.spawn_shell_pending.remove(&(session, conn_id, shell_id)); + handle.extra_shells.remove(&shell_id); + Some(OpResp::Err("shell spawn timed out".into())) + } + } + } OpReq::Watch { session } => { let mut rx = state.event_bus.subscribe(); let tx = out_tx.clone(); @@ -358,9 +437,9 @@ async fn disconnect_session(state: &Arc, name: &str) { } async fn reload_authorized_keys(state: &Arc, text: &str) { - let parsed = parse_authorized_keys(text); - let mut k = state.authorized_keys.write().await; - *k = parsed; + let mut keys = parse_authorized_keys(text); + merge_env_keys(&mut keys, &state.env_keys); + *state.authorized_keys.write().await = keys; } async fn send(socket: &mut WebSocket, msg: &BackendOpMsg) -> Result<(), axum::Error> { diff --git a/crates/rsh-backend/src/ws_stub.rs b/crates/rsh-backend/src/ws_stub.rs index 4f7eb77..e2dfad6 100644 --- a/crates/rsh-backend/src/ws_stub.rs +++ b/crates/rsh-backend/src/ws_stub.rs @@ -3,8 +3,10 @@ use crate::state::{AppState, ConnHandle}; use axum::extract::ws::{Message, WebSocket}; use axum::extract::{State, WebSocketUpgrade}; use axum::response::IntoResponse; +use dashmap::DashMap; use futures_util::{SinkExt, StreamExt}; use rsh_types::{BackendOpMsg, BackendStubMsg, ConnectionView, OpEvent, OpResp, StubMsg}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; @@ -49,6 +51,8 @@ async fn run(mut socket: WebSocket, state: Arc) { to_stub: to_stub_tx.clone(), attach: Mutex::new(None), connected_at, + extra_shells: DashMap::new(), + next_shell_id: AtomicU64::new(1), }); state.connections.insert((session_id.clone(), conn_id), handle.clone()); let _ = state.event_bus.send(OpEvent::NewConnection(ConnectionView { @@ -102,6 +106,22 @@ async fn run(mut socket: WebSocket, state: Arc) { forward_op(&handle_r, OpResp::Exited { code }).await; break; } + StubMsg::ShellReady { shell_id } => { + let key = (session_r.clone(), conn_id, shell_id); + if let Some((_, tx)) = state_r.spawn_shell_pending.remove(&key) { + let _ = tx.send(()); + } + } + StubMsg::ShellStdout { shell_id, data } => { + forward_shell_op(&handle_r, shell_id, OpResp::Stdout(data)).await; + } + StubMsg::ShellStderr { shell_id, data } => { + forward_shell_op(&handle_r, shell_id, OpResp::Stderr(data)).await; + } + StubMsg::ShellExited { shell_id, code } => { + forward_shell_op(&handle_r, shell_id, OpResp::Exited { code }).await; + handle_r.extra_shells.remove(&shell_id); + } StubMsg::Pong => {} StubMsg::Hello { .. } => {} } @@ -122,6 +142,18 @@ async fn forward_op(handle: &Arc, resp: OpResp) { } } +async fn forward_shell_op(handle: &Arc, shell_id: u64, resp: OpResp) { + if let Some(slot) = handle.extra_shells.get(&shell_id) { + let attach = slot.lock().await; + if let Some(sink) = attach.as_ref() { + let _ = sink + .sender + .send(BackendOpMsg::Resp { id: sink.req_id, body: resp }) + .await; + } + } +} + async fn cleanup(state: &Arc, session_id: &str, conn_id: u64) { state.connections.remove(&(session_id.to_string(), conn_id)); let _ = state.event_bus.send(OpEvent::ConnectionClosed { diff --git a/crates/rsh-types/src/lib.rs b/crates/rsh-types/src/lib.rs index f5cf240..031bf02 100644 --- a/crates/rsh-types/src/lib.rs +++ b/crates/rsh-types/src/lib.rs @@ -45,6 +45,10 @@ pub enum StubMsg { Stderr(Vec), Exited { code: Option }, Pong, + ShellReady { shell_id: u64 }, + ShellStdout { shell_id: u64, data: Vec }, + ShellStderr { shell_id: u64, data: Vec }, + ShellExited { shell_id: u64, code: Option }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -56,6 +60,10 @@ pub enum BackendStubMsg { Resize { cols: u16, rows: u16 }, Kill, Ping, + SpawnShell { shell_id: u64, shell: Option, pty: bool, cols: u16, rows: u16 }, + ShellStdin { shell_id: u64, data: Vec }, + ShellResize { shell_id: u64, cols: u16, rows: u16 }, + ShellKill { shell_id: u64 }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -82,6 +90,7 @@ pub enum OpReq { Attach { session: String, connection_id: Option, + shell_id: Option, pty: bool, cols: u16, rows: u16, @@ -93,6 +102,14 @@ pub enum OpReq { KeysRemove { keys: Vec }, KeysReplace { content: String }, Watch { session: Option }, + SpawnShell { + session: String, + connection_id: Option, + shell: Option, + pty: bool, + cols: u16, + rows: u16, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -108,6 +125,7 @@ pub enum OpResp { Exited { code: Option }, Keys(Vec), WatchStarted, + ShellSpawned { connection_id: u64, shell_id: u64 }, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/rsh/Cargo.toml b/crates/rsh/Cargo.toml index 9088a88..67f8808 100644 --- a/crates/rsh/Cargo.toml +++ b/crates/rsh/Cargo.toml @@ -18,3 +18,5 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } hostname = { workspace = true } whoami = { workspace = true } +rustls.workspace = true + diff --git a/crates/rsh/src/main.rs b/crates/rsh/src/main.rs index 6e15273..a3fd6ff 100644 --- a/crates/rsh/src/main.rs +++ b/crates/rsh/src/main.rs @@ -23,8 +23,16 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { - let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn,rsh=info")); - tracing_subscriber::fmt().with_env_filter(filter).with_writer(std::io::stderr).init(); + let filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn,rsh=info")); + tracing_subscriber::fmt() + .with_env_filter(filter) + .with_writer(std::io::stderr) + .init(); + + rustls::crypto::ring::default_provider() + .install_default() + .expect("failed to install ring provider"); let args = Args::parse(); let mut backoff = Duration::from_secs(1); diff --git a/crates/rsh/src/ws.rs b/crates/rsh/src/ws.rs index b7cbcba..3d18872 100644 --- a/crates/rsh/src/ws.rs +++ b/crates/rsh/src/ws.rs @@ -3,6 +3,7 @@ use crate::Args; use anyhow::{anyhow, Result}; use futures_util::{SinkExt, StreamExt}; use rsh_types::{BackendStubMsg, StubInfo, StubMsg}; +use std::collections::HashMap; use tokio_tungstenite::tungstenite::Message; pub enum Outcome { @@ -11,6 +12,17 @@ pub enum Outcome { Dropped, } +enum ExtraShellEvent { + Stdout { shell_id: u64, data: Vec }, + Exited { shell_id: u64, code: Option }, +} + +struct ExtraShell { + stdin_tx: tokio::sync::mpsc::Sender>, + resize_tx: Option>, + kill_tx: tokio::sync::oneshot::Sender<()>, +} + pub async fn run(args: &Args) -> Result { let (mut ws, _) = tokio_tungstenite::connect_async(&args.url).await?; let info = StubInfo { @@ -66,6 +78,8 @@ async fn run_pty( ) -> Result { let mut session = pty::spawn(shell, 80, 24)?; let (mut sink, mut stream) = ws.split(); + let (ev_tx, mut ev_rx) = tokio::sync::mpsc::channel::(64); + let mut extra_shells: HashMap = HashMap::new(); loop { tokio::select! { @@ -82,6 +96,19 @@ async fn run_pty( return Ok(Outcome::Dropped); } } + Some(ev) = ev_rx.recv() => { + let msg = match ev { + ExtraShellEvent::Stdout { shell_id, data } => StubMsg::ShellStdout { shell_id, data }, + ExtraShellEvent::Exited { shell_id, code } => { + extra_shells.remove(&shell_id); + StubMsg::ShellExited { shell_id, code } + } + }; + if sink.send(Message::Text(serde_json::to_string(&msg)?)).await.is_err() { + session.kill.kill(); + return Ok(Outcome::Dropped); + } + } msg = stream.next() => { let Some(msg) = msg else { session.kill.kill(); return Ok(Outcome::Dropped); }; let Ok(msg) = msg else { session.kill.kill(); return Ok(Outcome::Dropped); }; @@ -100,6 +127,37 @@ async fn run_pty( } BackendStubMsg::Kill => { session.kill.kill(); } BackendStubMsg::Ping => { let _ = sink.send(Message::Text(serde_json::to_string(&StubMsg::Pong)?)).await; } + BackendStubMsg::SpawnShell { shell_id, shell, pty, cols, rows } => { + let shell_bin = shell + .or_else(|| std::env::var("SHELL").ok()) + .unwrap_or_else(|| "/bin/sh".to_string()); + match spawn_extra_shell(shell_id, &shell_bin, pty, cols, rows, &ev_tx) { + Ok(extra) => { + extra_shells.insert(shell_id, extra); + let _ = sink.send(Message::Text(serde_json::to_string(&StubMsg::ShellReady { shell_id })?)).await; + } + Err(e) => { + tracing::warn!(shell_id, "failed to spawn extra shell: {e}"); + } + } + } + BackendStubMsg::ShellStdin { shell_id, data } => { + if let Some(s) = extra_shells.get(&shell_id) { + let _ = s.stdin_tx.send(data).await; + } + } + BackendStubMsg::ShellResize { shell_id, cols, rows } => { + if let Some(s) = extra_shells.get(&shell_id) { + if let Some(tx) = &s.resize_tx { + let _ = tx.send(portable_pty::PtySize { cols, rows, pixel_width: 0, pixel_height: 0 }); + } + } + } + BackendStubMsg::ShellKill { shell_id } => { + if let Some(s) = extra_shells.remove(&shell_id) { + let _ = s.kill_tx.send(()); + } + } _ => {} } } @@ -152,6 +210,9 @@ async fn run_no_pty( } }); + let (ev_tx, mut ev_rx) = tokio::sync::mpsc::channel::(64); + let mut extra_shells: HashMap = HashMap::new(); + loop { tokio::select! { status = child.wait() => { @@ -166,6 +227,19 @@ async fn run_no_pty( return Ok(Outcome::Dropped); } } + Some(ev) = ev_rx.recv() => { + let msg = match ev { + ExtraShellEvent::Stdout { shell_id, data } => StubMsg::ShellStdout { shell_id, data }, + ExtraShellEvent::Exited { shell_id, code } => { + extra_shells.remove(&shell_id); + StubMsg::ShellExited { shell_id, code } + } + }; + if sink.send(Message::Text(serde_json::to_string(&msg)?)).await.is_err() { + let _ = child.kill().await; + return Ok(Outcome::Dropped); + } + } m = stream.next() => { let Some(m) = m else { let _ = child.kill().await; return Ok(Outcome::Dropped); }; let Ok(m) = m else { let _ = child.kill().await; return Ok(Outcome::Dropped); }; @@ -179,9 +253,144 @@ async fn run_no_pty( match parsed { BackendStubMsg::Stdin(b) => { let _ = stdin.write_all(&b).await; } BackendStubMsg::Kill => { let _ = child.kill().await; } + BackendStubMsg::SpawnShell { shell_id, shell, pty, cols, rows } => { + let shell_bin = shell + .or_else(|| std::env::var("SHELL").ok()) + .unwrap_or_else(|| "/bin/sh".to_string()); + match spawn_extra_shell(shell_id, &shell_bin, pty, cols, rows, &ev_tx) { + Ok(extra) => { + extra_shells.insert(shell_id, extra); + let _ = sink.send(Message::Text(serde_json::to_string(&StubMsg::ShellReady { shell_id })?)).await; + } + Err(e) => { + tracing::warn!(shell_id, "failed to spawn extra shell: {e}"); + } + } + } + BackendStubMsg::ShellStdin { shell_id, data } => { + if let Some(s) = extra_shells.get(&shell_id) { + let _ = s.stdin_tx.send(data).await; + } + } + BackendStubMsg::ShellKill { shell_id } => { + if let Some(s) = extra_shells.remove(&shell_id) { + let _ = s.kill_tx.send(()); + } + } _ => {} } } } } } + +fn spawn_extra_shell( + shell_id: u64, + shell: &str, + pty: bool, + cols: u16, + rows: u16, + ev_tx: &tokio::sync::mpsc::Sender, +) -> Result { + if pty { + spawn_extra_pty(shell_id, shell, cols, rows, ev_tx) + } else { + spawn_extra_plain(shell_id, shell, ev_tx) + } +} + +fn spawn_extra_pty( + shell_id: u64, + shell: &str, + cols: u16, + rows: u16, + ev_tx: &tokio::sync::mpsc::Sender, +) -> Result { + let session = pty::spawn(shell, cols, rows)?; + let pty::PtySession { stdout_rx, stdin_tx, resize_tx, exit_rx, mut kill } = session; + + let ev_stdout = ev_tx.clone(); + tokio::spawn(async move { + let mut rx = stdout_rx; + while let Some(data) = rx.recv().await { + if ev_stdout.send(ExtraShellEvent::Stdout { shell_id, data }).await.is_err() { + break; + } + } + }); + + let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>(); + let ev_exit = ev_tx.clone(); + tokio::spawn(async move { + tokio::select! { + code = exit_rx => { + let code = code.ok().flatten(); + let _ = ev_exit.send(ExtraShellEvent::Exited { shell_id, code }).await; + } + Ok(()) = kill_rx => { + kill.kill(); + let _ = ev_exit.send(ExtraShellEvent::Exited { shell_id, code: None }).await; + } + } + }); + + Ok(ExtraShell { stdin_tx, resize_tx: Some(resize_tx), kill_tx }) +} + +fn spawn_extra_plain( + shell_id: u64, + shell: &str, + ev_tx: &tokio::sync::mpsc::Sender, +) -> Result { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::process::Command; + + let mut child = Command::new(shell) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .spawn()?; + let child_stdin = child.stdin.take().ok_or_else(|| anyhow!("no stdin"))?; + let mut child_stdout = child.stdout.take().ok_or_else(|| anyhow!("no stdout"))?; + + let (stdin_tx, mut stdin_rx) = tokio::sync::mpsc::channel::>(64); + tokio::spawn(async move { + let mut w = child_stdin; + while let Some(b) = stdin_rx.recv().await { + if w.write_all(&b).await.is_err() { + break; + } + } + }); + + let ev_stdout = ev_tx.clone(); + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + loop { + match child_stdout.read(&mut buf).await { + Ok(0) | Err(_) => break, + Ok(n) => { + if ev_stdout.send(ExtraShellEvent::Stdout { shell_id, data: buf[..n].to_vec() }).await.is_err() { + break; + } + } + } + } + }); + + let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>(); + let ev_exit = ev_tx.clone(); + tokio::spawn(async move { + tokio::select! { + status = child.wait() => { + let code = status.ok().and_then(|s| s.code()); + let _ = ev_exit.send(ExtraShellEvent::Exited { shell_id, code }).await; + } + Ok(()) = kill_rx => { + let _ = child.kill().await; + let _ = ev_exit.send(ExtraShellEvent::Exited { shell_id, code: None }).await; + } + } + }); + + Ok(ExtraShell { stdin_tx, resize_tx: None, kill_tx }) +} diff --git a/crates/rshc/src/auth.rs b/crates/rshc/src/auth.rs index 8952358..b3d3a14 100644 --- a/crates/rshc/src/auth.rs +++ b/crates/rshc/src/auth.rs @@ -44,9 +44,13 @@ impl AuthedClient { } let pub_openssh = priv_key.public_key().to_openssh().context("encode pubkey")?; - let (ws, _) = tokio_tungstenite::connect_async(&cfg.backend_url) + let ws_url = format!( + "{}/ws/op", + cfg.backend_url.trim_end_matches('/').trim_end_matches("/ws/op") + ); + let (ws, _) = tokio_tungstenite::connect_async(&ws_url) .await - .with_context(|| format!("ws connect {}", cfg.backend_url))?; + .with_context(|| format!("ws connect {ws_url}"))?; let (mut sink, mut stream) = ws.split(); send_msg(&mut sink, &OpMsg::AuthInit { pubkey_openssh: pub_openssh }).await?; diff --git a/crates/rshc/src/cmd/connect.rs b/crates/rshc/src/cmd/connect.rs index c144a7e..34aec0d 100644 --- a/crates/rshc/src/cmd/connect.rs +++ b/crates/rshc/src/cmd/connect.rs @@ -48,6 +48,7 @@ pub async fn run( .req_stream(OpReq::Attach { session: session.clone(), connection_id: Some(target.connection_id), + shell_id: None, pty, cols, rows, @@ -68,7 +69,7 @@ pub async fn run( } ui::print_info(&format!( "attached to #{} ({}@{}) — Ctrl-] to detach", - target.connection_id, target.info.user, target.info.hostname + target.connection_id, target.info.user, target.info.hostname, )); if pty { @@ -83,7 +84,7 @@ pub async fn run( result } -async fn pump( +pub async fn pump( client: &AuthedClient, resps: &mut tokio::sync::mpsc::Receiver, pty: bool, @@ -145,7 +146,7 @@ async fn pump( } } -fn spawn_resize_watch(tx: tokio::sync::mpsc::Sender<(u16, u16)>) { +pub fn spawn_resize_watch(tx: tokio::sync::mpsc::Sender<(u16, u16)>) { tokio::spawn(async move { let mut last = terminal::size().unwrap_or((80, 24)); loop { diff --git a/crates/rshc/src/cmd/mod.rs b/crates/rshc/src/cmd/mod.rs index 9d08cb3..1413ee6 100644 --- a/crates/rshc/src/cmd/mod.rs +++ b/crates/rshc/src/cmd/mod.rs @@ -3,3 +3,4 @@ pub mod connection; pub mod connect; pub mod keys; pub mod watch; +pub mod shell; diff --git a/crates/rshc/src/cmd/shell.rs b/crates/rshc/src/cmd/shell.rs new file mode 100644 index 0000000..cbe2053 --- /dev/null +++ b/crates/rshc/src/cmd/shell.rs @@ -0,0 +1,99 @@ +use crate::auth::AuthedClient; +use crate::cmd::connect; +use crate::cmd::connection; +use crate::ui; +use anyhow::{anyhow, Result}; +use crossterm::terminal; +use rsh_types::{OpReq, OpResp}; + +pub async fn run( + client: &AuthedClient, + session: String, + connection_id: Option, + shell: Option, + no_pty: bool, +) -> Result<()> { + let conns = connection::fetch(client, Some(session.clone())).await?; + if conns.is_empty() { + return Err(anyhow!("no connections for session '{session}'")); + } + let conn_id = match connection_id { + Some(id) => { + conns + .iter() + .find(|c| c.connection_id == id) + .ok_or_else(|| anyhow!("no connection {id} in session '{session}'"))? + .connection_id + } + None => { + if conns.len() == 1 { + conns[0].connection_id + } else { + let labels: Vec = conns + .iter() + .map(|c| format!("#{} {}@{} ({})", c.connection_id, c.info.user, c.info.hostname, ui::fmt_time(c.connected_at))) + .collect(); + let pick = inquire::Select::new("select connection:", labels.clone()) + .prompt() + .map_err(|e| anyhow!("prompt: {e}"))?; + let idx = labels.iter().position(|l| l == &pick).unwrap(); + conns[idx].connection_id + } + } + }; + + let (cols, rows) = terminal::size().unwrap_or((80, 24)); + let pty = !no_pty; + + let spawned = client + .req(OpReq::SpawnShell { + session: session.clone(), + connection_id: Some(conn_id), + shell, + pty, + cols, + rows, + }) + .await?; + let shell_id = match spawned { + OpResp::ShellSpawned { shell_id, .. } => shell_id, + OpResp::Err(e) => return Err(anyhow!(e)), + other => return Err(anyhow!("unexpected: {other:?}")), + }; + + let (attach_id, mut resps) = client + .req_stream(OpReq::Attach { + session: session.clone(), + connection_id: Some(conn_id), + shell_id: Some(shell_id), + pty, + cols, + rows, + }) + .await?; + + let ready = resps.recv().await.ok_or_else(|| anyhow!("attach: no response"))?; + match ready { + OpResp::AttachReady { .. } => {} + OpResp::Err(e) => { + client.drop_stream(attach_id).await; + return Err(anyhow!(e)); + } + other => { + client.drop_stream(attach_id).await; + return Err(anyhow!("unexpected: {other:?}")); + } + } + ui::print_info(&format!("spawned shell in #{conn_id} — Ctrl-] to detach")); + + if pty { + terminal::enable_raw_mode().ok(); + } + let result = connect::pump(client, &mut resps, pty).await; + if pty { + terminal::disable_raw_mode().ok(); + } + client.drop_stream(attach_id).await; + println!(); + result +} diff --git a/crates/rshc/src/config.rs b/crates/rshc/src/config.rs index 3d58e80..978b8bf 100644 --- a/crates/rshc/src/config.rs +++ b/crates/rshc/src/config.rs @@ -24,7 +24,7 @@ impl Config { std::fs::create_dir_all(parent).ok(); } let stub = Config { - backend_url: "wss://example.invalid/ws/op".into(), + backend_url: "wss://example.invalid".into(), ssh_key_file: "~/.ssh/id_ed25519".into(), }; let txt = serde_yaml::to_string(&stub)?; diff --git a/crates/rshc/src/main.rs b/crates/rshc/src/main.rs index d0fd2f8..8b82749 100644 --- a/crates/rshc/src/main.rs +++ b/crates/rshc/src/main.rs @@ -24,6 +24,8 @@ enum Cmd { Connection(ConnectionCmd), #[command(alias = "c")] Connect(ConnectArgs), + #[command(alias = "sh")] + Shell(ShellArgs), Keys(KeysCmd), } @@ -84,6 +86,17 @@ struct ConnectArgs { no_pty: bool, } +#[derive(Args, Debug)] +struct ShellArgs { + session: String, + #[arg(long)] + connection: Option, + #[arg(long)] + shell: Option, + #[arg(long)] + no_pty: bool, +} + #[derive(Args, Debug)] struct KeysCmd { #[command(subcommand)] @@ -146,6 +159,7 @@ async fn run() -> Result<()> { ConnectionSub::List { session } => cmd::connection::list(&client, session).await, }, Cmd::Connect(a) => cmd::connect::run(&client, a.session, a.connection_id, a.no_pty).await, + Cmd::Shell(a) => cmd::shell::run(&client, a.session, a.connection, a.shell, a.no_pty).await, Cmd::Keys(k) => match k.sub { KeysSub::Append { key, file, url } => cmd::keys::append(&client, key, file, url).await, KeysSub::Rm { key, file, url } => cmd::keys::remove(&client, key, file, url).await, diff --git a/justfile b/justfile index 55a0884..3f1291f 100644 --- a/justfile +++ b/justfile @@ -23,5 +23,9 @@ dev: stubs docker-build: docker build -t rsh-backend:local . +docker-publish: + docker tag localhost/rsh-backend:local registry.walruslab.org/pub/rsh-backend:latest + docker push registry.walruslab.org/pub/rsh-backend:latest + docker-run: docker run --rm -p 7777:7777 -v rsh-data:/var/lib/rsh rsh-backend:local