1
This commit is contained in:
@@ -6,6 +6,7 @@ pub struct Config {
|
||||
pub data_dir: PathBuf,
|
||||
pub bind: SocketAddr,
|
||||
pub log: String,
|
||||
pub authorized_keys_env: Option<String>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
@@ -17,7 +18,8 @@ impl Config {
|
||||
.unwrap_or_else(|_| "0.0.0.0:7777".to_string())
|
||||
.parse()?;
|
||||
let log = std::env::var("RSH_LOG").unwrap_or_else(|_| "info,tower_http=warn".to_string());
|
||||
Ok(Self { data_dir, bind, log })
|
||||
let authorized_keys_env = std::env::var("RSH_AUTHORIZED_KEYS").ok();
|
||||
Ok(Self { data_dir, bind, log, authorized_keys_env })
|
||||
}
|
||||
|
||||
pub fn sessions_path(&self) -> PathBuf {
|
||||
|
||||
@@ -27,17 +27,25 @@ else
|
||||
echo "rsh: need curl or wget" >&2; exit 1
|
||||
fi
|
||||
chmod +x "$TMP"
|
||||
|
||||
printf 'Password (empty for none): ' >&2
|
||||
stty -echo 2>/dev/null || true
|
||||
IFS= read -r PW || PW=''
|
||||
stty echo 2>/dev/null || true
|
||||
stty -echo </dev/tty 2>/dev/null || true
|
||||
|
||||
# Force read to use the actual terminal device
|
||||
IFS= read -r PW < /dev/tty || PW=''
|
||||
|
||||
stty echo </dev/tty 2>/dev/null || true
|
||||
echo >&2
|
||||
|
||||
trap - EXIT
|
||||
LOG="/tmp/rsh-${SESSION}.log"
|
||||
if [ -n "$PW" ]; then
|
||||
exec "$TMP" --url "$WS/ws/stub" --session "$SESSION" --password "$PW"
|
||||
nohup "$TMP" --url "$WS/ws/stub" --session "$SESSION" --password "$PW" >"$LOG" 2>&1 &
|
||||
else
|
||||
exec "$TMP" --url "$WS/ws/stub" --session "$SESSION"
|
||||
nohup "$TMP" --url "$WS/ws/stub" --session "$SESSION" >"$LOG" 2>&1 &
|
||||
fi
|
||||
disown 2>/dev/null || true
|
||||
printf 'rsh: stub running in background (pid %s, log: %s)\n' "$!" "$LOG" >&2
|
||||
"#;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -75,15 +83,16 @@ pub async fn stub(Path(arch): Path<String>) -> Response {
|
||||
_ => return StatusCode::NOT_FOUND.into_response(),
|
||||
};
|
||||
if bytes.is_empty() {
|
||||
return (StatusCode::SERVICE_UNAVAILABLE, "stub not embedded in this build").into_response();
|
||||
return (
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"stub not embedded in this build",
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
(
|
||||
[
|
||||
(header::CONTENT_TYPE, "application/octet-stream"),
|
||||
(
|
||||
header::CONTENT_DISPOSITION,
|
||||
"attachment; filename=\"rsh\"",
|
||||
),
|
||||
(header::CONTENT_DISPOSITION, "attachment; filename=\"rsh\""),
|
||||
],
|
||||
bytes,
|
||||
)
|
||||
@@ -93,7 +102,8 @@ pub async fn stub(Path(arch): Path<String>) -> Response {
|
||||
fn is_valid_session(s: &str) -> bool {
|
||||
!s.is_empty()
|
||||
&& s.len() <= 64
|
||||
&& s.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-')
|
||||
&& s.chars()
|
||||
.all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-')
|
||||
}
|
||||
|
||||
fn detect_scheme(headers: &HeaderMap) -> (&'static str, &'static str) {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use ssh_key::PublicKey;
|
||||
use std::collections::HashSet;
|
||||
|
||||
pub fn parse_authorized_keys(content: &str) -> Vec<PublicKey> {
|
||||
content
|
||||
@@ -8,3 +9,15 @@ pub fn parse_authorized_keys(content: &str) -> Vec<PublicKey> {
|
||||
.filter_map(|l| PublicKey::from_openssh(l).ok())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn merge_env_keys(base: &mut Vec<PublicKey>, env_keys: &[PublicKey]) {
|
||||
let existing: HashSet<String> = base
|
||||
.iter()
|
||||
.map(|k| k.fingerprint(Default::default()).to_string())
|
||||
.collect();
|
||||
for k in env_keys {
|
||||
if !existing.contains(&k.fingerprint(Default::default()).to_string()) {
|
||||
base.push(k.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
{
|
||||
let mut k = state.authorized_keys.write().await;
|
||||
*k = keys::parse_authorized_keys(&keys_text);
|
||||
tracing::info!(count = k.len(), "loaded authorized keys");
|
||||
keys::merge_env_keys(&mut k, &state.env_keys);
|
||||
tracing::info!(count = k.len(), env_keys = state.env_keys.len(), "loaded authorized keys");
|
||||
}
|
||||
|
||||
let app = Router::new()
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
use crate::config::Config;
|
||||
use crate::keys::parse_authorized_keys;
|
||||
use dashmap::DashMap;
|
||||
use rsh_types::{BackendOpMsg, BackendStubMsg, OpEvent, SessionRecord, StubInfo};
|
||||
use ssh_key::PublicKey;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
|
||||
|
||||
pub struct ConnHandle {
|
||||
pub info: StubInfo,
|
||||
pub to_stub: mpsc::Sender<BackendStubMsg>,
|
||||
pub attach: Mutex<Option<AttachSink>>,
|
||||
pub connected_at: i64,
|
||||
pub extra_shells: DashMap<u64, Mutex<Option<AttachSink>>>,
|
||||
pub next_shell_id: AtomicU64,
|
||||
}
|
||||
|
||||
pub struct AttachSink {
|
||||
@@ -25,19 +28,28 @@ pub struct AppState {
|
||||
pub connections: DashMap<(String, u64), Arc<ConnHandle>>,
|
||||
pub next_conn_id: DashMap<String, AtomicU64>,
|
||||
pub authorized_keys: RwLock<Vec<PublicKey>>,
|
||||
pub env_keys: Vec<PublicKey>,
|
||||
pub event_bus: broadcast::Sender<OpEvent>,
|
||||
pub spawn_shell_pending: DashMap<(String, u64, u64), oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(cfg: Config) -> Self {
|
||||
let (tx, _) = broadcast::channel(256);
|
||||
let env_keys = cfg
|
||||
.authorized_keys_env
|
||||
.as_deref()
|
||||
.map(parse_authorized_keys)
|
||||
.unwrap_or_default();
|
||||
Self {
|
||||
cfg,
|
||||
sessions: RwLock::new(HashMap::new()),
|
||||
connections: DashMap::new(),
|
||||
next_conn_id: DashMap::new(),
|
||||
authorized_keys: RwLock::new(Vec::new()),
|
||||
env_keys,
|
||||
event_bus: tx,
|
||||
spawn_shell_pending: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::auth::{find_key, verify_signature};
|
||||
use crate::keys::parse_authorized_keys;
|
||||
use crate::keys::{merge_env_keys, parse_authorized_keys};
|
||||
use std::sync::atomic::Ordering;
|
||||
use crate::persist;
|
||||
use crate::state::{AppState, AttachSink};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
@@ -51,7 +52,7 @@ async fn run(mut socket: WebSocket, state: Arc<AppState>) {
|
||||
let _ = sink.close().await;
|
||||
});
|
||||
|
||||
let mut attached: Option<(String, u64, u64)> = None;
|
||||
let mut attached: Option<(String, u64, u64, Option<u64>)> = None;
|
||||
|
||||
while let Some(Ok(msg)) = stream.next().await {
|
||||
let text = match msg {
|
||||
@@ -82,10 +83,16 @@ async fn run(mut socket: WebSocket, state: Arc<AppState>) {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((s, c, _)) = attached {
|
||||
if let Some((s, c, _, shell_id)) = attached {
|
||||
if let Some(handle) = state.connections.get(&(s, c)) {
|
||||
let mut a = handle.attach.lock().await;
|
||||
*a = None;
|
||||
match shell_id {
|
||||
None => { *handle.attach.lock().await = None; }
|
||||
Some(sid) => {
|
||||
if let Some(slot) = handle.extra_shells.get(&sid) {
|
||||
*slot.lock().await = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(out_tx);
|
||||
@@ -119,7 +126,7 @@ async fn auth_handshake(socket: &mut WebSocket, state: &Arc<AppState>) -> Result
|
||||
async fn handle_req(
|
||||
state: &Arc<AppState>,
|
||||
out_tx: &mpsc::Sender<BackendOpMsg>,
|
||||
attached: &mut Option<(String, u64, u64)>,
|
||||
attached: &mut Option<(String, u64, u64, Option<u64>)>,
|
||||
req_id: u64,
|
||||
body: OpReq,
|
||||
) -> Option<OpResp> {
|
||||
@@ -187,7 +194,7 @@ async fn handle_req(
|
||||
OpReq::ConnectionList { session } => {
|
||||
Some(OpResp::Connections(state.list_connections(session.as_deref())))
|
||||
}
|
||||
OpReq::Attach { session, connection_id, pty: _, cols, rows } => {
|
||||
OpReq::Attach { session, connection_id, shell_id, pty: _, cols, rows } => {
|
||||
let conn_id = match connection_id {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
@@ -209,42 +216,71 @@ async fn handle_req(
|
||||
let Some(handle) = state.connections.get(&(session.clone(), conn_id)).map(|h| h.clone()) else {
|
||||
return Some(OpResp::Err("connection not found".into()));
|
||||
};
|
||||
if let Some(sid) = shell_id {
|
||||
let Some(slot) = handle.extra_shells.get(&sid) else {
|
||||
return Some(OpResp::Err(format!("no shell {sid}")));
|
||||
};
|
||||
*slot.lock().await = Some(AttachSink { req_id, sender: out_tx.clone() });
|
||||
let _ = handle.to_stub.send(BackendStubMsg::ShellResize { shell_id: sid, cols, rows }).await;
|
||||
*attached = Some((session, conn_id, req_id, Some(sid)));
|
||||
return Some(OpResp::AttachReady { connection_id: conn_id });
|
||||
}
|
||||
{
|
||||
let mut a = handle.attach.lock().await;
|
||||
*a = Some(AttachSink { req_id, sender: out_tx.clone() });
|
||||
}
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Resize { cols, rows }).await;
|
||||
*attached = Some((session, conn_id, req_id));
|
||||
*attached = Some((session, conn_id, req_id, None));
|
||||
Some(OpResp::AttachReady { connection_id: conn_id })
|
||||
}
|
||||
OpReq::AttachIO(frame) => {
|
||||
let Some((session, conn_id, _)) = attached.clone() else {
|
||||
let Some((session, conn_id, _, shell_id)) = attached.clone() else {
|
||||
return Some(OpResp::Err("not attached".into()));
|
||||
};
|
||||
let Some(handle) = state.connections.get(&(session, conn_id)).map(|h| h.clone()) else {
|
||||
return Some(OpResp::Err("connection gone".into()));
|
||||
};
|
||||
match frame {
|
||||
AttachIOFrame::Stdin(b) => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Stdin(b)).await;
|
||||
if let Some(sid) = shell_id {
|
||||
match frame {
|
||||
AttachIOFrame::Stdin(b) => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::ShellStdin { shell_id: sid, data: b }).await;
|
||||
}
|
||||
AttachIOFrame::Resize { cols, rows } => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::ShellResize { shell_id: sid, cols, rows }).await;
|
||||
}
|
||||
AttachIOFrame::Kill | AttachIOFrame::Eof => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::ShellKill { shell_id: sid }).await;
|
||||
}
|
||||
}
|
||||
AttachIOFrame::Resize { cols, rows } => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Resize { cols, rows }).await;
|
||||
}
|
||||
AttachIOFrame::Kill => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Kill).await;
|
||||
}
|
||||
AttachIOFrame::Eof => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Stdin(Vec::new())).await;
|
||||
} else {
|
||||
match frame {
|
||||
AttachIOFrame::Stdin(b) => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Stdin(b)).await;
|
||||
}
|
||||
AttachIOFrame::Resize { cols, rows } => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Resize { cols, rows }).await;
|
||||
}
|
||||
AttachIOFrame::Kill => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Kill).await;
|
||||
}
|
||||
AttachIOFrame::Eof => {
|
||||
let _ = handle.to_stub.send(BackendStubMsg::Stdin(Vec::new())).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
OpReq::Detach => {
|
||||
if let Some((s, c, _)) = attached.take() {
|
||||
if let Some((s, c, _, shell_id)) = attached.take() {
|
||||
if let Some(handle) = state.connections.get(&(s, c)) {
|
||||
let mut a = handle.attach.lock().await;
|
||||
*a = None;
|
||||
match shell_id {
|
||||
None => { *handle.attach.lock().await = None; }
|
||||
Some(sid) => {
|
||||
if let Some(slot) = handle.extra_shells.get(&sid) {
|
||||
*slot.lock().await = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(OpResp::Ok)
|
||||
@@ -309,6 +345,49 @@ async fn handle_req(
|
||||
reload_authorized_keys(state, &content).await;
|
||||
Some(OpResp::Ok)
|
||||
}
|
||||
OpReq::SpawnShell { session, connection_id, shell, pty, cols, rows } => {
|
||||
let conn_id = match connection_id {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
let mut found = None;
|
||||
for kv in state.connections.iter() {
|
||||
if kv.key().0 == session {
|
||||
if found.is_some() {
|
||||
return Some(OpResp::Err("multiple connections; specify id".into()));
|
||||
}
|
||||
found = Some(kv.key().1);
|
||||
}
|
||||
}
|
||||
match found {
|
||||
Some(c) => c,
|
||||
None => return Some(OpResp::Err("no connections".into())),
|
||||
}
|
||||
}
|
||||
};
|
||||
let Some(handle) = state.connections.get(&(session.clone(), conn_id)).map(|h| h.clone()) else {
|
||||
return Some(OpResp::Err("connection not found".into()));
|
||||
};
|
||||
let shell_id = handle.next_shell_id.fetch_add(1, Ordering::Relaxed);
|
||||
handle.extra_shells.insert(shell_id, tokio::sync::Mutex::new(None));
|
||||
|
||||
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
state.spawn_shell_pending.insert((session.clone(), conn_id, shell_id), ready_tx);
|
||||
|
||||
if handle.to_stub.send(BackendStubMsg::SpawnShell { shell_id, shell, pty, cols, rows }).await.is_err() {
|
||||
state.spawn_shell_pending.remove(&(session.clone(), conn_id, shell_id));
|
||||
handle.extra_shells.remove(&shell_id);
|
||||
return Some(OpResp::Err("connection lost".into()));
|
||||
}
|
||||
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(10), ready_rx).await {
|
||||
Ok(Ok(())) => Some(OpResp::ShellSpawned { connection_id: conn_id, shell_id }),
|
||||
_ => {
|
||||
state.spawn_shell_pending.remove(&(session, conn_id, shell_id));
|
||||
handle.extra_shells.remove(&shell_id);
|
||||
Some(OpResp::Err("shell spawn timed out".into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
OpReq::Watch { session } => {
|
||||
let mut rx = state.event_bus.subscribe();
|
||||
let tx = out_tx.clone();
|
||||
@@ -358,9 +437,9 @@ async fn disconnect_session(state: &Arc<AppState>, name: &str) {
|
||||
}
|
||||
|
||||
async fn reload_authorized_keys(state: &Arc<AppState>, text: &str) {
|
||||
let parsed = parse_authorized_keys(text);
|
||||
let mut k = state.authorized_keys.write().await;
|
||||
*k = parsed;
|
||||
let mut keys = parse_authorized_keys(text);
|
||||
merge_env_keys(&mut keys, &state.env_keys);
|
||||
*state.authorized_keys.write().await = keys;
|
||||
}
|
||||
|
||||
async fn send(socket: &mut WebSocket, msg: &BackendOpMsg) -> Result<(), axum::Error> {
|
||||
|
||||
@@ -3,8 +3,10 @@ use crate::state::{AppState, ConnHandle};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use axum::extract::{State, WebSocketUpgrade};
|
||||
use axum::response::IntoResponse;
|
||||
use dashmap::DashMap;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use rsh_types::{BackendOpMsg, BackendStubMsg, ConnectionView, OpEvent, OpResp, StubMsg};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
@@ -49,6 +51,8 @@ async fn run(mut socket: WebSocket, state: Arc<AppState>) {
|
||||
to_stub: to_stub_tx.clone(),
|
||||
attach: Mutex::new(None),
|
||||
connected_at,
|
||||
extra_shells: DashMap::new(),
|
||||
next_shell_id: AtomicU64::new(1),
|
||||
});
|
||||
state.connections.insert((session_id.clone(), conn_id), handle.clone());
|
||||
let _ = state.event_bus.send(OpEvent::NewConnection(ConnectionView {
|
||||
@@ -102,6 +106,22 @@ async fn run(mut socket: WebSocket, state: Arc<AppState>) {
|
||||
forward_op(&handle_r, OpResp::Exited { code }).await;
|
||||
break;
|
||||
}
|
||||
StubMsg::ShellReady { shell_id } => {
|
||||
let key = (session_r.clone(), conn_id, shell_id);
|
||||
if let Some((_, tx)) = state_r.spawn_shell_pending.remove(&key) {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
StubMsg::ShellStdout { shell_id, data } => {
|
||||
forward_shell_op(&handle_r, shell_id, OpResp::Stdout(data)).await;
|
||||
}
|
||||
StubMsg::ShellStderr { shell_id, data } => {
|
||||
forward_shell_op(&handle_r, shell_id, OpResp::Stderr(data)).await;
|
||||
}
|
||||
StubMsg::ShellExited { shell_id, code } => {
|
||||
forward_shell_op(&handle_r, shell_id, OpResp::Exited { code }).await;
|
||||
handle_r.extra_shells.remove(&shell_id);
|
||||
}
|
||||
StubMsg::Pong => {}
|
||||
StubMsg::Hello { .. } => {}
|
||||
}
|
||||
@@ -122,6 +142,18 @@ async fn forward_op(handle: &Arc<crate::state::ConnHandle>, resp: OpResp) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn forward_shell_op(handle: &Arc<crate::state::ConnHandle>, shell_id: u64, resp: OpResp) {
|
||||
if let Some(slot) = handle.extra_shells.get(&shell_id) {
|
||||
let attach = slot.lock().await;
|
||||
if let Some(sink) = attach.as_ref() {
|
||||
let _ = sink
|
||||
.sender
|
||||
.send(BackendOpMsg::Resp { id: sink.req_id, body: resp })
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn cleanup(state: &Arc<AppState>, session_id: &str, conn_id: u64) {
|
||||
state.connections.remove(&(session_id.to_string(), conn_id));
|
||||
let _ = state.event_bus.send(OpEvent::ConnectionClosed {
|
||||
|
||||
Reference in New Issue
Block a user