add watch
This commit is contained in:
@@ -127,6 +127,9 @@ impl AuthedClient {
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// Signal all pending callers that the connection is gone.
|
||||
pending_r.lock().await.clear();
|
||||
*events_tx_r.lock().await = None;
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
@@ -147,7 +150,7 @@ impl AuthedClient {
|
||||
.send(OpMsg::Req { id, body })
|
||||
.await
|
||||
.map_err(|_| anyhow!("send failed (connection closed)"))?;
|
||||
rx.await.map_err(|_| anyhow!("response dropped"))
|
||||
rx.await.map_err(|_| anyhow!("backend disconnected"))
|
||||
}
|
||||
|
||||
pub async fn req_stream(&self, body: OpReq) -> Result<(u64, mpsc::Receiver<OpResp>)> {
|
||||
|
||||
@@ -114,24 +114,25 @@ pub async fn pump(
|
||||
}
|
||||
let _ = client.send_attach_io(AttachIOFrame::Stdin(buf[..n].to_vec())).await;
|
||||
}
|
||||
Some(resp) = resps.recv() => {
|
||||
resp = resps.recv() => {
|
||||
match resp {
|
||||
OpResp::Stdout(b) => {
|
||||
None => return Err(anyhow!("backend disconnected")),
|
||||
Some(OpResp::Stdout(b)) => {
|
||||
let mut out = std::io::stdout();
|
||||
out.write_all(&b).ok();
|
||||
out.flush().ok();
|
||||
}
|
||||
OpResp::Stderr(b) => {
|
||||
Some(OpResp::Stderr(b)) => {
|
||||
let mut err = std::io::stderr();
|
||||
err.write_all(&b).ok();
|
||||
err.flush().ok();
|
||||
}
|
||||
OpResp::Exited { code } => {
|
||||
Some(OpResp::Exited { code }) => {
|
||||
ui::print_info(&format!("remote exited (code {:?})", code));
|
||||
return Ok(());
|
||||
}
|
||||
OpResp::Err(e) => return Err(anyhow!(e)),
|
||||
_ => {}
|
||||
Some(OpResp::Err(e)) => return Err(anyhow!(e)),
|
||||
Some(_) => {}
|
||||
}
|
||||
}
|
||||
Some((cols, rows)) = async {
|
||||
|
||||
@@ -1,39 +1,128 @@
|
||||
use crate::auth::AuthedClient;
|
||||
use crate::config::Config;
|
||||
use crate::ui;
|
||||
use anyhow::{anyhow, Result};
|
||||
use std::time::Duration;
|
||||
use crossterm::{cursor::MoveTo, execute, terminal::{Clear, ClearType}};
|
||||
use owo_colors::OwoColorize;
|
||||
use rsh_types::{OpEvent, OpReq, OpResp};
|
||||
use rsh_types::{ConnectionView, OpEvent, OpReq, OpResp, SessionView};
|
||||
use std::io::stdout;
|
||||
|
||||
pub async fn run(client: &AuthedClient, session: Option<String>) -> Result<()> {
|
||||
let mut events = client.take_events().await;
|
||||
|
||||
match client.req(OpReq::Watch { session: session.clone() }).await? {
|
||||
OpResp::WatchStarted => {}
|
||||
OpResp::Err(e) => return Err(anyhow!(e)),
|
||||
other => return Err(anyhow!("unexpected: {other:?}")),
|
||||
}
|
||||
ui::print_info(&format!(
|
||||
"watching {}",
|
||||
session.as_deref().unwrap_or("all sessions")
|
||||
));
|
||||
while let Some(ev) = events.recv().await {
|
||||
match ev {
|
||||
OpEvent::NewConnection(c) => println!(
|
||||
"{} {} #{} {}@{}",
|
||||
"+conn".green().bold(),
|
||||
c.session_id,
|
||||
c.connection_id,
|
||||
c.info.user,
|
||||
c.info.hostname
|
||||
),
|
||||
OpEvent::ConnectionClosed { session, connection_id } => println!(
|
||||
"{} {} #{}",
|
||||
"-conn".red().bold(),
|
||||
session,
|
||||
connection_id
|
||||
),
|
||||
OpEvent::NewSession(s) => println!("{} {}", "+sess".cyan().bold(), s.id),
|
||||
OpEvent::SessionDeleted { session } => println!("{} {}", "-sess".magenta().bold(), session),
|
||||
|
||||
let mut sessions: Vec<SessionView> = match client.req(OpReq::SessionList).await? {
|
||||
OpResp::Sessions(s) => s,
|
||||
OpResp::Err(e) => return Err(anyhow!(e)),
|
||||
other => return Err(anyhow!("unexpected: {other:?}")),
|
||||
};
|
||||
let mut connections: Vec<ConnectionView> =
|
||||
match client.req(OpReq::ConnectionList { session: session.clone() }).await? {
|
||||
OpResp::Connections(c) => c,
|
||||
OpResp::Err(e) => return Err(anyhow!(e)),
|
||||
other => return Err(anyhow!("unexpected: {other:?}")),
|
||||
};
|
||||
|
||||
if let Some(ref filter) = session {
|
||||
sessions.retain(|s| &s.id == filter);
|
||||
}
|
||||
sessions.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
connections.sort_by(|a, b| {
|
||||
a.session_id.cmp(&b.session_id).then(a.connection_id.cmp(&b.connection_id))
|
||||
});
|
||||
|
||||
render(&sessions, &connections)?;
|
||||
|
||||
loop {
|
||||
match events.recv().await {
|
||||
None => return Err(anyhow!("backend disconnected")),
|
||||
Some(ev) => {
|
||||
match ev {
|
||||
OpEvent::NewConnection(c) => {
|
||||
if let Some(s) = sessions.iter_mut().find(|s| s.id == c.session_id) {
|
||||
s.connection_count += 1;
|
||||
}
|
||||
connections.push(c);
|
||||
connections.sort_by(|a, b| {
|
||||
a.session_id.cmp(&b.session_id).then(a.connection_id.cmp(&b.connection_id))
|
||||
});
|
||||
}
|
||||
OpEvent::ConnectionClosed { session: sess, connection_id } => {
|
||||
connections.retain(|c| !(c.session_id == sess && c.connection_id == connection_id));
|
||||
if let Some(s) = sessions.iter_mut().find(|s| s.id == sess) {
|
||||
s.connection_count = s.connection_count.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
OpEvent::NewSession(s) => {
|
||||
if session.is_none() || session.as_deref() == Some(&s.id) {
|
||||
sessions.push(s);
|
||||
sessions.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
}
|
||||
}
|
||||
OpEvent::SessionDeleted { session: sess } => {
|
||||
sessions.retain(|s| s.id != sess);
|
||||
connections.retain(|c| c.session_id != sess);
|
||||
}
|
||||
}
|
||||
render(&sessions, &connections)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_with_reconnect(cfg: &Config, session: Option<String>) -> Result<()> {
|
||||
let mut backoff = Duration::from_secs(1);
|
||||
loop {
|
||||
match AuthedClient::connect(cfg).await {
|
||||
Err(e) => {
|
||||
eprintln!("\nconnect failed: {e}");
|
||||
}
|
||||
Ok(client) => {
|
||||
backoff = Duration::from_secs(1);
|
||||
match run(&client, session.clone()).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) => {
|
||||
eprintln!("\n{e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
eprintln!("reconnecting in {}s...", backoff.as_secs());
|
||||
tokio::time::sleep(backoff).await;
|
||||
backoff = (backoff * 2).min(Duration::from_secs(30));
|
||||
}
|
||||
}
|
||||
|
||||
fn render(sessions: &[SessionView], connections: &[ConnectionView]) -> Result<()> {
|
||||
execute!(stdout(), Clear(ClearType::All), MoveTo(0, 0))?;
|
||||
|
||||
println!(
|
||||
"{} {} sessions {} connections {}",
|
||||
"●".green().bold(),
|
||||
sessions.len().bold(),
|
||||
connections.len().bold(),
|
||||
"Ctrl-C to exit".dimmed(),
|
||||
);
|
||||
println!();
|
||||
|
||||
if sessions.is_empty() {
|
||||
println!("{}", " no sessions".dimmed());
|
||||
} else {
|
||||
println!("{}", ui::sessions_table(sessions));
|
||||
}
|
||||
println!();
|
||||
|
||||
if connections.is_empty() {
|
||||
println!("{}", " no connections".dimmed());
|
||||
} else {
|
||||
println!("{}", ui::connections_table(connections));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ impl Config {
|
||||
return PathBuf::from(p);
|
||||
}
|
||||
let base = dirs::config_dir().unwrap_or_else(|| PathBuf::from("."));
|
||||
base.join("rsh.yaml")
|
||||
base.join("rshc.yaml")
|
||||
}
|
||||
|
||||
pub fn load() -> Result<Self> {
|
||||
@@ -35,7 +35,8 @@ impl Config {
|
||||
));
|
||||
}
|
||||
let raw = std::fs::read_to_string(&p).with_context(|| format!("read {}", p.display()))?;
|
||||
let cfg: Config = serde_yaml::from_str(&raw).with_context(|| format!("parse {}", p.display()))?;
|
||||
let cfg: Config =
|
||||
serde_yaml::from_str(&raw).with_context(|| format!("parse {}", p.display()))?;
|
||||
Ok(cfg)
|
||||
}
|
||||
|
||||
|
||||
@@ -139,9 +139,12 @@ async fn main() {
|
||||
async fn run() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
let cfg = Config::load()?;
|
||||
if let Cmd::Watch(a) = cli.cmd {
|
||||
return cmd::watch::run_with_reconnect(&cfg, a.session).await;
|
||||
}
|
||||
let client = AuthedClient::connect(&cfg).await?;
|
||||
match cli.cmd {
|
||||
Cmd::Watch(a) => cmd::watch::run(&client, a.session).await,
|
||||
Cmd::Watch(_) => unreachable!(),
|
||||
Cmd::Session(s) => match s.sub {
|
||||
SessionSub::Create { name } => cmd::session::create(&client, name).await,
|
||||
SessionSub::Delete { name, yes, disconnect } => cmd::session::delete(&client, name, yes, disconnect).await,
|
||||
|
||||
Reference in New Issue
Block a user