Skip to content

Commit e143a6f

Browse files
committed
feat: add session name template support
This patch adds support for session name templates so that you can switch multiple shpool sessions all at once. In some sense this is a super-set of the 'shpool switch' FR. I did lay some groundwork for implementing support for that in this change, though I'm starting to wonder if templates are good enough on their own. The only extra thing that switch would bring is the ability to switch sessions you don't pre-declare as switchable with a dedicated variable up front. One thing worth bikeshedding: At first I was thinking '${var}' for the substitution syntax, then realized that would be weird about nesting when it comes to shells, so I switched to '#{var}' syntax, but then I realized that's the comment char in shells, and wound up on '@{var}'. I'm open to other symbol/syntax ideas. BREAKING: this breaks shpool-protocol since we have a new chunk kind.
1 parent 830b4cc commit e143a6f

12 files changed

Lines changed: 1222 additions & 318 deletions

File tree

libshpool/src/attach.rs

Lines changed: 297 additions & 198 deletions
Large diffs are not rendered by default.

libshpool/src/daemon/server.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ use anyhow::{anyhow, Context};
3535
use nix::unistd;
3636
use shpool_protocol::{
3737
AttachHeader, AttachReplyHeader, AttachStatus, ConnectHeader, DetachReply, DetachRequest,
38-
KillReply, KillRequest, ListReply, LogLevel, ResizeReply, Session, SessionMessageDetachReply,
39-
SessionMessageReply, SessionMessageRequest, SessionMessageRequestPayload, SessionStatus,
40-
SetLogLevelReply, SetLogLevelRequest, VersionHeader,
38+
KillReply, KillRequest, ListReply, LogLevel, MaybeSwitch, ModifyVarReply, ModifyVarRequest,
39+
ResizeReply, Session, SessionMessageDetachReply, SessionMessageReply, SessionMessageRequest,
40+
SessionMessageRequestPayload, SessionStatus, SetLogLevelReply, SetLogLevelRequest,
41+
VersionHeader,
4142
};
4243
use tracing::{debug, error, info, instrument, span, warn, Level};
4344

@@ -79,6 +80,7 @@ pub struct Server {
7980
tracing_subscriber::filter::LevelFilter,
8081
tracing_subscriber::registry::Registry,
8182
>,
83+
vars: Mutex<HashMap<String, String>>,
8284
}
8385

8486
impl Server {
@@ -112,6 +114,7 @@ impl Server {
112114
hooks,
113115
daily_messenger,
114116
log_level_handle,
117+
vars: HashMap::new().into(),
115118
}))
116119
}
117120

@@ -209,6 +212,8 @@ impl Server {
209212
ConnectHeader::List => self.handle_list(stream),
210213
ConnectHeader::SessionMessage(header) => self.handle_session_message(stream, header),
211214
ConnectHeader::SetLogLevel(r) => self.handle_set_log_level(stream, r),
215+
ConnectHeader::GetVars => self.handle_get_vars(stream),
216+
ConnectHeader::ModifyVar(r) => self.handle_modify_var(stream, r),
212217
}
213218
}
214219

@@ -609,6 +614,57 @@ impl Server {
609614
Ok(())
610615
}
611616

617+
#[instrument(skip_all)]
618+
fn handle_get_vars(&self, mut stream: UnixStream) -> anyhow::Result<()> {
619+
let maybe_switch = {
620+
let var_map = self.vars.lock().unwrap();
621+
let vars: Vec<(String, String)> =
622+
var_map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
623+
shpool_protocol::MaybeSwitch { switch_to: None, vars }
624+
};
625+
626+
write_reply(&mut stream, maybe_switch).context("writing maybe_switch reply")?;
627+
Ok(())
628+
}
629+
630+
#[instrument(skip_all)]
631+
fn handle_modify_var(
632+
&self,
633+
mut stream: UnixStream,
634+
request: ModifyVarRequest,
635+
) -> anyhow::Result<()> {
636+
let maybe_switch = {
637+
let mut vars = self.vars.lock().unwrap();
638+
if let Some(val) = request.val {
639+
vars.insert(request.var, val);
640+
} else {
641+
vars.remove(&request.var);
642+
}
643+
644+
MaybeSwitch {
645+
switch_to: None,
646+
vars: vars.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
647+
}
648+
};
649+
650+
let mut ctls = Vec::new();
651+
{
652+
let shells = self.shells.lock().unwrap();
653+
for (_, session) in shells.iter() {
654+
ctls.push(Arc::clone(&session.shell_to_client_ctl));
655+
}
656+
}
657+
for ctl in ctls.into_iter() {
658+
let ctl = ctl.lock().unwrap();
659+
ctl.maybe_switch
660+
.send_timeout(maybe_switch.clone(), SESSION_MSG_TIMEOUT)
661+
.context("broadcasting maybe_switch")?;
662+
}
663+
664+
write_reply(&mut stream, ModifyVarReply {}).context("writing modify var reply")?;
665+
Ok(())
666+
}
667+
612668
#[instrument(skip_all)]
613669
fn handle_kill(&self, mut stream: UnixStream, request: KillRequest) -> anyhow::Result<()> {
614670
let mut not_found_sessions = vec![];
@@ -983,13 +1039,18 @@ impl Server {
9831039
let (heartbeat_tx, heartbeat_rx) = crossbeam_channel::bounded(0);
9841040
let (heartbeat_ack_tx, heartbeat_ack_rx) = crossbeam_channel::bounded(0);
9851041

986-
let shell_to_client_ctl = Arc::new(Mutex::new(shell::ReaderCtl {
1042+
// We make this buffered to avoid blocking during a broadcast. There is
1043+
// no ack chan so we can afford to buffer a bit.
1044+
let (maybe_switch_tx, maybe_switch_rx) = crossbeam_channel::bounded(10);
1045+
1046+
let shell_to_client_ctl = Arc::new(Mutex::new(shell::ShellToClientCtl {
9871047
client_connection: client_connection_tx,
9881048
client_connection_ack: client_connection_ack_rx,
9891049
tty_size_change: tty_size_change_tx,
9901050
tty_size_change_ack: tty_size_change_ack_rx,
9911051
heartbeat: heartbeat_tx,
9921052
heartbeat_ack: heartbeat_ack_rx,
1053+
maybe_switch: maybe_switch_tx,
9931054
}));
9941055

9951056
let mut session_inner = shell::SessionInner {
@@ -1023,6 +1084,7 @@ impl Server {
10231084
tty_size_change_ack: tty_size_change_ack_tx,
10241085
heartbeat: heartbeat_rx,
10251086
heartbeat_ack: heartbeat_ack_tx,
1087+
maybe_switch: maybe_switch_rx,
10261088
child_exit_notifier: shell_to_client_child_exit_notifier,
10271089
})?);
10281090

libshpool/src/daemon/shell.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ use std::{
2828

2929
use anyhow::{anyhow, Context};
3030
use nix::{poll, poll::PollFlags, sys::signal, unistd::Pid};
31-
use shpool_protocol::{Chunk, ChunkKind, TtySize};
31+
use shpool_protocol::{Chunk, ChunkKind, MaybeSwitch, TtySize};
3232
use tracing::{debug, error, info, instrument, span, trace, warn, Level};
3333

3434
use crate::{
3535
common, consts,
3636
daemon::{config, exit_notify::ExitNotifier, keybindings, pager::PagerCtl, prompt, show_motd},
37+
protocol,
3738
protocol::ChunkExt as _,
3839
session_restore, test_hooks,
3940
tty::TtySizeExt as _,
@@ -74,7 +75,7 @@ pub struct Session {
7475
pub lifecycle_timestamps: Mutex<SessionLifecycleTimestamps>,
7576
pub child_pid: libc::pid_t,
7677
pub child_exit_notifier: Arc<ExitNotifier>,
77-
pub shell_to_client_ctl: Arc<Mutex<ReaderCtl>>,
78+
pub shell_to_client_ctl: Arc<Mutex<ShellToClientCtl>>,
7879
pub pager_ctl: Arc<Mutex<Option<PagerCtl>>>,
7980
/// Mutable state with the lock held by the servicing handle_attach thread
8081
/// while a tty is attached to the session. Probing the mutex can be used
@@ -109,7 +110,7 @@ impl Session {
109110
#[derive(Debug)]
110111
pub struct SessionInner {
111112
pub name: String, // to improve logging
112-
pub shell_to_client_ctl: Arc<Mutex<ReaderCtl>>,
113+
pub shell_to_client_ctl: Arc<Mutex<ShellToClientCtl>>,
113114
pub pty_master: shpool_pty::fork::Fork,
114115
pub client_stream: Option<UnixStream>,
115116
pub config: config::Manager,
@@ -207,6 +208,7 @@ pub struct ShellToClientArgs {
207208
pub tty_size_change: crossbeam_channel::Receiver<TtySize>,
208209
pub tty_size_change_ack: crossbeam_channel::Sender<()>,
209210
pub heartbeat: crossbeam_channel::Receiver<()>,
211+
pub maybe_switch: crossbeam_channel::Receiver<MaybeSwitch>,
210212
// true if the client is still live, false if it has hung up on us
211213
pub heartbeat_ack: crossbeam_channel::Sender<bool>,
212214
pub child_exit_notifier: Arc<ExitNotifier>,
@@ -393,6 +395,42 @@ impl SessionInner {
393395
args.heartbeat_ack.send(client_present)
394396
.context("sending heartbeat ack")?;
395397
}
398+
recv(args.maybe_switch) -> maybe_switch => {
399+
let maybe_switch = match maybe_switch {
400+
Ok(ms) => ms,
401+
Err(e) => {
402+
error!("error recving MaybeSwitch: {:?}", e);
403+
continue;
404+
},
405+
};
406+
407+
let conn = if let ClientConnectionMsg::New(c) = &mut client_conn {
408+
c
409+
} else {
410+
info!("got MaybeSwitch, but no attached client, dropping");
411+
continue;
412+
};
413+
414+
let mut encoded = Vec::new();
415+
if let Err(e) = protocol::encode_to(&maybe_switch, &mut encoded) {
416+
error!("error encoding MaybeSwitch: {:?}", e);
417+
continue;
418+
}
419+
420+
let chunk = Chunk { kind: ChunkKind::MaybeSwitch, buf: &encoded[..] };
421+
match chunk.write_to(&mut conn.sink).and_then(|_| conn.sink.flush()) {
422+
Ok(_) => {
423+
trace!("wrote MaybeSwitch");
424+
}
425+
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
426+
trace!("writing MaybeSwitch: client hangup: {:?}", e);
427+
}
428+
Err(e) => {
429+
error!("unexpected IO error while writing heartbeat: {}", e);
430+
return Err(e).context("writing MaybeSwitch")?;
431+
}
432+
}
433+
}
396434

397435
// make this select non-blocking so we spend most of our time parked
398436
// in poll
@@ -1003,7 +1041,7 @@ impl SessionInner {
10031041
/// Shared between the session struct (for calls originating with the cli)
10041042
/// and the session inner struct (for calls resulting from keybindings).
10051043
#[derive(Debug)]
1006-
pub struct ReaderCtl {
1044+
pub struct ShellToClientCtl {
10071045
/// A control channel for the shell->client thread. Whenever a new client
10081046
/// dials in, the output stream for that client must be attached to the
10091047
/// shell->client thread by sending it down this channel. A disconnect
@@ -1028,6 +1066,12 @@ pub struct ReaderCtl {
10281066
// True if the client is still listening, false if it has hung up
10291067
// on us.
10301068
pub heartbeat_ack: crossbeam_channel::Receiver<bool>,
1069+
1070+
/// A control channel telling the shell->client thread to
1071+
/// broadcast the given MaybeSwitch. There is no ack channel
1072+
/// because we just blast this out and the caller doesn't need
1073+
/// to know about completion.
1074+
pub maybe_switch: crossbeam_channel::Sender<MaybeSwitch>,
10311075
}
10321076

10331077
/// Given a buffer, a length after which the data is not valid, a list of

libshpool/src/lib.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ mod list;
4343
mod protocol;
4444
mod session_restore;
4545
mod set_log_level;
46+
mod template;
4647
mod test_hooks;
4748
mod tty;
4849
mod user;
50+
mod var;
4951

5052
/// The command line arguments that shpool expects.
5153
/// These can be directly parsed with clap or manually
@@ -210,6 +212,54 @@ needs debugging, but would be clobbered by a restart.")]
210212
#[clap(help = "new log level")]
211213
level: shpool_protocol::LogLevel,
212214
},
215+
216+
#[clap(about = "Manipulate template variables
217+
218+
shpool session names can include #{variables} which are resolved via
219+
an environment stored globally in the shpool daemon. This command
220+
manipulates that environment.
221+
222+
The main usecase for templated session names is the ability to switch
223+
multiple shpool sessions to new targets at the same time. For example,
224+
you might have a `shpool attach -f '#{workspace}-edit'` session and
225+
a `shpool attach -f '#{workspace}-term'` session. To switch both
226+
sessions from the fun-feature workspace to the key-bugfix workspace,
227+
you could just do `shpool var set workspace key-bugfix`.
228+
")]
229+
#[non_exhaustive]
230+
Var {
231+
#[clap(subcommand)]
232+
command: VarCommands,
233+
},
234+
}
235+
236+
/// The subcommds of the var command.
237+
#[derive(Subcommand, Debug)]
238+
#[non_exhaustive]
239+
pub enum VarCommands {
240+
#[clap(about = "List the variables
241+
242+
This command dumps out the whole variable list with
243+
both vars and values in a JSON object using vars as keys.")]
244+
List {
245+
#[clap(short, long, help = "Output as JSON")]
246+
json: bool,
247+
},
248+
#[clap(about = "Get a variable
249+
250+
This returns the raw value of the given variable.")]
251+
#[non_exhaustive]
252+
Get { var: String },
253+
#[clap(about = "Set a variable
254+
255+
This updates the value of the given variable.")]
256+
#[non_exhaustive]
257+
Set { var: String, val: String },
258+
#[clap(about = "Unset a variable
259+
260+
This removes the given variable from the environment.")]
261+
#[non_exhaustive]
262+
Unset { var: String },
213263
}
214264

215265
impl Args {
@@ -382,6 +432,7 @@ pub fn run(args: Args, hooks: Option<Box<dyn hooks::Hooks + Send + Sync>>) -> an
382432
Commands::Kill { sessions } => kill::run(sessions, socket),
383433
Commands::List { json } => list::run(socket, json),
384434
Commands::SetLogLevel { level } => set_log_level::run(level, socket),
435+
Commands::Var { command } => var::run(socket, command),
385436
};
386437

387438
if let Err(err) = res {

0 commit comments

Comments
 (0)