Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
495 changes: 297 additions & 198 deletions libshpool/src/attach.rs

Large diffs are not rendered by default.

70 changes: 66 additions & 4 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use anyhow::{anyhow, Context};
use nix::unistd;
use shpool_protocol::{
AttachHeader, AttachReplyHeader, AttachStatus, ConnectHeader, DetachReply, DetachRequest,
KillReply, KillRequest, ListReply, LogLevel, ResizeReply, Session, SessionMessageDetachReply,
SessionMessageReply, SessionMessageRequest, SessionMessageRequestPayload, SessionStatus,
SetLogLevelReply, SetLogLevelRequest, VersionHeader,
KillReply, KillRequest, ListReply, LogLevel, MaybeSwitch, ModifyVarReply, ModifyVarRequest,
ResizeReply, Session, SessionMessageDetachReply, SessionMessageReply, SessionMessageRequest,
SessionMessageRequestPayload, SessionStatus, SetLogLevelReply, SetLogLevelRequest,
VersionHeader,
};
use tracing::{debug, error, info, instrument, span, warn, Level};

Expand Down Expand Up @@ -79,6 +80,7 @@ pub struct Server {
tracing_subscriber::filter::LevelFilter,
tracing_subscriber::registry::Registry,
>,
vars: Mutex<HashMap<String, String>>,
}

impl Server {
Expand Down Expand Up @@ -112,6 +114,7 @@ impl Server {
hooks,
daily_messenger,
log_level_handle,
vars: HashMap::new().into(),
}))
}

Expand Down Expand Up @@ -209,6 +212,8 @@ impl Server {
ConnectHeader::List => self.handle_list(stream),
ConnectHeader::SessionMessage(header) => self.handle_session_message(stream, header),
ConnectHeader::SetLogLevel(r) => self.handle_set_log_level(stream, r),
ConnectHeader::GetVars => self.handle_get_vars(stream),
ConnectHeader::ModifyVar(r) => self.handle_modify_var(stream, r),
}
}

Expand Down Expand Up @@ -609,6 +614,57 @@ impl Server {
Ok(())
}

#[instrument(skip_all)]
fn handle_get_vars(&self, mut stream: UnixStream) -> anyhow::Result<()> {
let maybe_switch = {
let var_map = self.vars.lock().unwrap();
let vars: Vec<(String, String)> =
var_map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
shpool_protocol::MaybeSwitch { switch_to: None, vars }
};

write_reply(&mut stream, maybe_switch).context("writing maybe_switch reply")?;
Ok(())
}

#[instrument(skip_all)]
fn handle_modify_var(
&self,
mut stream: UnixStream,
request: ModifyVarRequest,
) -> anyhow::Result<()> {
let maybe_switch = {
let mut vars = self.vars.lock().unwrap();
if let Some(val) = request.val {
vars.insert(request.var, val);
} else {
vars.remove(&request.var);
}

MaybeSwitch {
switch_to: None,
vars: vars.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
}
};

let mut ctls = Vec::new();
{
let shells = self.shells.lock().unwrap();
for (_, session) in shells.iter() {
ctls.push(Arc::clone(&session.shell_to_client_ctl));
}
}
for ctl in ctls.into_iter() {
let ctl = ctl.lock().unwrap();
ctl.maybe_switch
.send_timeout(maybe_switch.clone(), SESSION_MSG_TIMEOUT)
.context("broadcasting maybe_switch")?;
}

write_reply(&mut stream, ModifyVarReply {}).context("writing modify var reply")?;
Ok(())
}

#[instrument(skip_all)]
fn handle_kill(&self, mut stream: UnixStream, request: KillRequest) -> anyhow::Result<()> {
let mut not_found_sessions = vec![];
Expand Down Expand Up @@ -983,13 +1039,18 @@ impl Server {
let (heartbeat_tx, heartbeat_rx) = crossbeam_channel::bounded(0);
let (heartbeat_ack_tx, heartbeat_ack_rx) = crossbeam_channel::bounded(0);

let shell_to_client_ctl = Arc::new(Mutex::new(shell::ReaderCtl {
// We make this buffered to avoid blocking during a broadcast. There is
// no ack chan so we can afford to buffer a bit.
let (maybe_switch_tx, maybe_switch_rx) = crossbeam_channel::bounded(10);

let shell_to_client_ctl = Arc::new(Mutex::new(shell::ShellToClientCtl {
client_connection: client_connection_tx,
client_connection_ack: client_connection_ack_rx,
tty_size_change: tty_size_change_tx,
tty_size_change_ack: tty_size_change_ack_rx,
heartbeat: heartbeat_tx,
heartbeat_ack: heartbeat_ack_rx,
maybe_switch: maybe_switch_tx,
}));

let mut session_inner = shell::SessionInner {
Expand Down Expand Up @@ -1023,6 +1084,7 @@ impl Server {
tty_size_change_ack: tty_size_change_ack_tx,
heartbeat: heartbeat_rx,
heartbeat_ack: heartbeat_ack_tx,
maybe_switch: maybe_switch_rx,
child_exit_notifier: shell_to_client_child_exit_notifier,
})?);

Expand Down
52 changes: 48 additions & 4 deletions libshpool/src/daemon/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ use std::{

use anyhow::{anyhow, Context};
use nix::{poll, poll::PollFlags, sys::signal, unistd::Pid};
use shpool_protocol::{Chunk, ChunkKind, TtySize};
use shpool_protocol::{Chunk, ChunkKind, MaybeSwitch, TtySize};
use tracing::{debug, error, info, instrument, span, trace, warn, Level};

use crate::{
common, consts,
daemon::{config, exit_notify::ExitNotifier, keybindings, pager::PagerCtl, prompt, show_motd},
protocol,
protocol::ChunkExt as _,
session_restore, test_hooks,
tty::TtySizeExt as _,
Expand Down Expand Up @@ -74,7 +75,7 @@ pub struct Session {
pub lifecycle_timestamps: Mutex<SessionLifecycleTimestamps>,
pub child_pid: libc::pid_t,
pub child_exit_notifier: Arc<ExitNotifier>,
pub shell_to_client_ctl: Arc<Mutex<ReaderCtl>>,
pub shell_to_client_ctl: Arc<Mutex<ShellToClientCtl>>,
pub pager_ctl: Arc<Mutex<Option<PagerCtl>>>,
/// Mutable state with the lock held by the servicing handle_attach thread
/// while a tty is attached to the session. Probing the mutex can be used
Expand Down Expand Up @@ -109,7 +110,7 @@ impl Session {
#[derive(Debug)]
pub struct SessionInner {
pub name: String, // to improve logging
pub shell_to_client_ctl: Arc<Mutex<ReaderCtl>>,
pub shell_to_client_ctl: Arc<Mutex<ShellToClientCtl>>,
pub pty_master: shpool_pty::fork::Fork,
pub client_stream: Option<UnixStream>,
pub config: config::Manager,
Expand Down Expand Up @@ -207,6 +208,7 @@ pub struct ShellToClientArgs {
pub tty_size_change: crossbeam_channel::Receiver<TtySize>,
pub tty_size_change_ack: crossbeam_channel::Sender<()>,
pub heartbeat: crossbeam_channel::Receiver<()>,
pub maybe_switch: crossbeam_channel::Receiver<MaybeSwitch>,
// true if the client is still live, false if it has hung up on us
pub heartbeat_ack: crossbeam_channel::Sender<bool>,
pub child_exit_notifier: Arc<ExitNotifier>,
Expand Down Expand Up @@ -393,6 +395,42 @@ impl SessionInner {
args.heartbeat_ack.send(client_present)
.context("sending heartbeat ack")?;
}
recv(args.maybe_switch) -> maybe_switch => {
let maybe_switch = match maybe_switch {
Ok(ms) => ms,
Err(e) => {
error!("error recving MaybeSwitch: {:?}", e);
continue;
},
};

let conn = if let ClientConnectionMsg::New(c) = &mut client_conn {
c
} else {
info!("got MaybeSwitch, but no attached client, dropping");
continue;
};

let mut encoded = Vec::new();
if let Err(e) = protocol::encode_to(&maybe_switch, &mut encoded) {
error!("error encoding MaybeSwitch: {:?}", e);
continue;
}

let chunk = Chunk { kind: ChunkKind::MaybeSwitch, buf: &encoded[..] };
match chunk.write_to(&mut conn.sink).and_then(|_| conn.sink.flush()) {
Ok(_) => {
trace!("wrote MaybeSwitch");
}
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
trace!("writing MaybeSwitch: client hangup: {:?}", e);
}
Err(e) => {
error!("unexpected IO error while writing heartbeat: {}", e);
return Err(e).context("writing MaybeSwitch")?;
}
}
}

// make this select non-blocking so we spend most of our time parked
// in poll
Expand Down Expand Up @@ -1003,7 +1041,7 @@ impl SessionInner {
/// Shared between the session struct (for calls originating with the cli)
/// and the session inner struct (for calls resulting from keybindings).
#[derive(Debug)]
pub struct ReaderCtl {
pub struct ShellToClientCtl {
/// A control channel for the shell->client thread. Whenever a new client
/// dials in, the output stream for that client must be attached to the
/// shell->client thread by sending it down this channel. A disconnect
Expand All @@ -1028,6 +1066,12 @@ pub struct ReaderCtl {
// True if the client is still listening, false if it has hung up
// on us.
pub heartbeat_ack: crossbeam_channel::Receiver<bool>,

/// A control channel telling the shell->client thread to
/// broadcast the given MaybeSwitch. There is no ack channel
/// because we just blast this out and the caller doesn't need
/// to know about completion.
pub maybe_switch: crossbeam_channel::Sender<MaybeSwitch>,
}

/// Given a buffer, a length after which the data is not valid, a list of
Expand Down
51 changes: 51 additions & 0 deletions libshpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ mod list;
mod protocol;
mod session_restore;
mod set_log_level;
mod template;
mod test_hooks;
mod tty;
mod user;
mod var;

/// The command line arguments that shpool expects.
/// These can be directly parsed with clap or manually
Expand Down Expand Up @@ -210,6 +212,54 @@ needs debugging, but would be clobbered by a restart.")]
#[clap(help = "new log level")]
level: shpool_protocol::LogLevel,
},

#[clap(about = "Manipulate template variables

shpool session names can include {variables} which are resolved via
an environment stored globally in the shpool daemon. This command
manipulates that environment.

The main usecase for templated session names is the ability to switch
multiple shpool sessions to new targets at the same time. For example,
you might have a `shpool attach -f '{workspace}-edit'` session and
a `shpool attach -f '{workspace}-term'` session. To switch both
sessions from the fun-feature workspace to the key-bugfix workspace,
you could just do `shpool var set workspace key-bugfix`.
")]
#[non_exhaustive]
Var {
#[clap(subcommand)]
command: VarCommands,
},
}

/// The subcommds of the var command.
#[derive(Subcommand, Debug)]
#[non_exhaustive]
pub enum VarCommands {
#[clap(about = "List the variables

This command dumps out the whole variable list with
both vars and values in a JSON object using vars as keys.")]
List {
#[clap(short, long, help = "Output as JSON")]
json: bool,
},
#[clap(about = "Get a variable

This returns the raw value of the given variable.")]
#[non_exhaustive]
Get { var: String },
#[clap(about = "Set a variable

This updates the value of the given variable.")]
#[non_exhaustive]
Set { var: String, val: String },
#[clap(about = "Unset a variable

This removes the given variable from the environment.")]
#[non_exhaustive]
Unset { var: String },
}

impl Args {
Expand Down Expand Up @@ -382,6 +432,7 @@ pub fn run(args: Args, hooks: Option<Box<dyn hooks::Hooks + Send + Sync>>) -> an
Commands::Kill { sessions } => kill::run(sessions, socket),
Commands::List { json } => list::run(socket, json),
Commands::SetLogLevel { level } => set_log_level::run(level, socket),
Commands::Var { command } => var::run(socket, command),
};

if let Err(err) = res {
Expand Down
Loading
Loading