Skip to content

Feat/events socket#352

Open
GeoffChurch wants to merge 12 commits intoshell-pool:masterfrom
GeoffChurch:feat/events-socket
Open

Feat/events socket#352
GeoffChurch wants to merge 12 commits intoshell-pool:masterfrom
GeoffChurch:feat/events-socket

Conversation

@GeoffChurch
Copy link
Copy Markdown

Issue Link

#346 (comment)

AI Policy Ack

ack -- pair-programmed with Claude; every line was human-reviewed.

Description

Adds an events socket where the shpool daemon can push events to subscribers. This lets external programs like TUIs stay updated with the shpool state w/o needing to poll. The new events socket is located at <runtime_dir>/shpool/events.socket, alongside the main socket.

The format is JSONL (each event is a one-line JSON object). Events look like:

{"type":"snapshot","sessions":[ {...Session...} ]}
{"type":"session.created","name":"main","started_at_unix_ms":...}
{"type":"session.attached","name":"main","last_connected_at_unix_ms":...}
{"type":"session.detached","name":"main","last_disconnected_at_unix_ms":...}
{"type":"session.removed","name":"main","reason":"exited"|"killed"}

This PR also (tentatively) includes shpool events (see fn subscribe_to_stdout), which is a QoL subcommand that connects to the events socket, prints each event as its received, and flushes after each print. This alleviates some boilerplate from downstream users and makes it easy to pipe the output to other programs.

Each subscription spawns a writer thread with its own internal buffer and timeout. This is done to avoid blocking on slow event consumers.

@google-cla
Copy link
Copy Markdown

google-cla Bot commented Apr 27, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@ethanpailes
Copy link
Copy Markdown
Contributor

ethanpailes commented Apr 27, 2026

I really appreciate your enthusiam, but in the future, let's try to do more up-front design before jumping into coding. #346 (comment) explains some more of my thoughts on the matter.

Rather than jumping straight to code, can you first post the design you intend to code to, with a particular focus on the contract and API that we'll offer. Basically, I think we should work out and agree on what the developer-facing documentation will look like (doesn't have to be polished, but we should have a precise spec that a developer for a tui could code against and agree on it). Normally and issue would be the place to do that, but we can just do it in this thread and then you can update the PR contents once we've settled on something.

Some feedback on what I see in the PR description:

  1. As mentioned in feat: shpool tui #346 (comment) I think we should have less rich events to make implementation smaller and ensure that sync APIs are just as rich.
  2. I like the "session." naming scheme
  3. I like the shpool events subcommand, seems pretty useful, especially for people hacking together scripts rather than making a full application

@GeoffChurch
Copy link
Copy Markdown
Author

GeoffChurch commented Apr 27, 2026

Re less rich events, how about just the type and name keys? It looks like name is always available, but that could of course be dropped too, or made optional.

One goal of the current design was to make it so that an external program can, solely by watching the events socket, maintain a consistent view of the shpool state. This required the snapshot-upon-subscription event (for synchronization) and some metadata (like started_at_unix_ms in session creation events).

So, if we simplify the events, the events stream is no longer sufficient, so it might as well be simplified further to no longer fire any -upon-subscription event (which also means events.rs doesn't need to use shpool_protocol::Session).


Re your events::emit("session-added")? example in #346 (comment), would you want to still have a (simplified) Event enum for type safety?


Another benefit of shpool events I just realized is that it I think suffices to form the sole external interface. This would hide the implementation details of the events socket and its location, e.g. sidestepping the need to clarify that the events socket is read-only/unidirectional.

@ethanpailes
Copy link
Copy Markdown
Contributor

Yeah, keeping an Event enum seems like a good idea. I think dropping the snapshot and the extra fields besides type and name seem pretty good.

In terms of hiding the socket location, I still think we should document the protocol and socket location. I do think that heavier duty applications should probably be just directly dialing the socket rather than forking a sub-proc, so we should be intentional about making that part of the public API (people will just do it anyway and we'll be on the hook for compatibility so we might as well be intentional). I think the way you have it with as sidecar events.socket file.

@ethanpailes
Copy link
Copy Markdown
Contributor

Maybe we could even get away with dropping name as well. Why do you need to be able to track which specific session changed?

@GeoffChurch
Copy link
Copy Markdown
Author

I don't need the name field so we can drop it.

I think the external documentation would just cover the event types and serde-derived JSONL format of the stream, accessible via shpool events or directly via events.socket.

Would the documentation live in a top-level EVENTS.md that README.md points to?

@ethanpailes
Copy link
Copy Markdown
Contributor

Yeah, I think that would be a good place to put the documentation. We probably don't want a section inline in the README so a seperate .md file is a good way to go. That all sounds like a good plan. Do you want to update this PR with that plan in mind? Once you are done, just let me know and I'll review.

@GeoffChurch
Copy link
Copy Markdown
Author

I've drafted EVENTS.md and stripped out the extra fields. It looks like the CLA check is unhappy about the Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> lines - I can amend the commits to omit them if that's the resolution.

Copy link
Copy Markdown
Contributor

@ethanpailes ethanpailes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EVENTS.md looks pretty reasonable, just a few bits of feedback. Let me know when the rest of the change is ready for review and I'll give it a full read through.

Comment thread EVENTS.md Outdated
Comment thread EVENTS.md Outdated
Comment thread EVENTS.md
Each subscriber has a bounded outbound queue. A subscriber that falls too far
behind is dropped by the daemon (in which case the subscriber can always reconnect).
There is no replay, so events that fired while a subscriber was disconnected are
lost.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should document a knob in the config.toml that tunes how large this queue is.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want the config knob added in this PR?

The queues currently use std::sync::mpsc::sync_channel here, pre-allocating all SUBSCRIBER_QUEUE_DEPTH slots.

Would you want to use something slightly slower that allocates on-demand and has something like shrink_to_fit to return memory after a spike? I'm just imagining if someone sets their queue depth to 1 billion not expecting it to pre-allocate for each subscriber. If it instead allocates on-demand you could maybe set the queue depth to something large like 100K by default anyway, at which point maybe it wouldn't be worth offering a config knob.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a good point. Honestly I should probably suppress the urge to throw a knob for everything. Who is the target demo that would actually tune this knob? Probably a tool developer who would tell their users to configure shpool specially, but that seems pretty unlikely. If it is a real problem they can always open an issue and we can expose the knob then.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orthogonal to the knob, would you want a dynamically allocated data structure to replace sync_channel anyway? If a user goes wild with sessions, e.g. a bunch of programmatically generated sessions with different @{var} combinations, then Mutex<VecDeque>/shrink_to_fit might scale better.

Actually, since the events are uninformative by design, subscribers probably won't care which events are in the queue so much as if an event is in the queue. So, what about sync_channel(1) (a single slot)? Option would work too but sync_channel makes it easy to increase to 8 or 16 if events become more informative later on.

@ethanpailes
Copy link
Copy Markdown
Contributor

@GeoffChurch are you ready for me to review the main PR body or are you still chugging?

@GeoffChurch
Copy link
Copy Markdown
Author

I think the only thing left is a question I had about the config knob:
#352 (comment)

@ethanpailes
Copy link
Copy Markdown
Contributor

Sounds good, I'll give this a review soon.

Comment thread libshpool/src/daemon/mod.rs Outdated
Comment thread libshpool/src/daemon/mod.rs Outdated
Comment thread libshpool/src/daemon/server.rs Outdated
Comment thread libshpool/src/daemon/server.rs Outdated
Comment thread libshpool/src/daemon/server.rs Outdated
Comment thread libshpool/src/events.rs
Comment thread libshpool/src/events.rs Outdated
Comment thread libshpool/src/events.rs Outdated
Comment thread libshpool/src/events.rs Outdated
for stream in listener.incoming() {
match stream {
Ok(stream) => {
if let Err(e) = on_accept(stream) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably fork a thread befor each connection so that multiple people can dial in in parallel.

Comment thread libshpool/src/events.rs Outdated
@ethanpailes
Copy link
Copy Markdown
Contributor

Sorry for all the typos. I'm on a slow connection and it seems like github round-trips edits so it is really painful to type in the review boxes.

GeoffChurch added a commit to GeoffChurch/shpool that referenced this pull request May 5, 2026
@GeoffChurch GeoffChurch force-pushed the feat/events-socket branch 2 times, most recently from 17cff9f to f2a7338 Compare May 5, 2026 17:59
GeoffChurch added a commit to GeoffChurch/shpool that referenced this pull request May 5, 2026
@GeoffChurch GeoffChurch force-pushed the feat/events-socket branch 2 times, most recently from e33e751 to 5250954 Compare May 5, 2026 21:04
@ethanpailes
Copy link
Copy Markdown
Contributor

@GeoffChurch is this ready for re-review?

@GeoffChurch
Copy link
Copy Markdown
Author

@ethanpailes I'm still working on shifting to a single sink thread (#352 (comment)). Per ca34b35, it does look like the current implementation remains quite fast under load, but the proposed change will certainly scale better wrt latency and memory. Just mentioning in case you'd want to split the refactor into a new PR. I'll keep at it in any case.

@ethanpailes
Copy link
Copy Markdown
Contributor

Let's keep it all on one PR. Just let me know when you're done.

GeoffChurch added a commit to GeoffChurch/shpool that referenced this pull request May 7, 2026
@GeoffChurch GeoffChurch force-pushed the feat/events-socket branch from 5250954 to 589df30 Compare May 7, 2026 19:25
@GeoffChurch
Copy link
Copy Markdown
Author

@ethanpailes ready for review. Non-blocking writes made this a bit tubbier than I anticipated. The earlier per-sub threads were able to block on write_all, but when they're all on one thread we have to use non-blocking writes and remember how many bytes still need to be sent. I tried to encapsulate the low-level stuff via SubscriberWriter. Maybe there is some opportunity for simplification.

@ethanpailes
Copy link
Copy Markdown
Contributor

Do you mind rebasing? I just merged some stuff so now the diff is out of sync. Sorry about that. One change in particular is probably gonna be annoying. I had to switch from stdlib mutexes to parking lot mutexes to fix a race condition.

Adds the Event enum and RemovedReason for the push-event socket. Wire
format is JSONL: snapshot reuses shpool_protocol::Session so the schema
matches `shpool list --json`; deltas are flat objects tagged with `type`.
The daemon binds a sibling Unix socket (events.socket) next to the main
shpool socket and accepts long-lived subscribers. Each new subscriber
receives a snapshot of the session table as its first message, built
under the shells lock so subsequent deltas (published in a follow-up
change) cannot race the registration.

Per-subscriber writer threads with bounded channels and a write timeout
isolate slow or stuck consumers from the daemon's hot path; subscribers
that fall behind are dropped and re-sync by reconnecting.

Extracts collect_sessions from handle_list so the snapshot and the
existing `shpool list --json` output share one schema-producing path.
Emits session.created, session.attached, session.detached, and
session.removed (with reason exited|killed) at the seven mutation
sites that change the session table:

  * select_shell_desc create path: created + attached
  * select_shell_desc reattach path: attached
  * handle_attach client-disconnect path: detached
  * handle_attach shell-exit path: removed{exited}
  * handle_detach: detached
  * handle_kill: removed{killed}
  * ttl_reaper expiry: removed{killed}

Each publish runs inside the same shells-lock scope as its mutation,
so wire-order matches causal-order for any subscriber. Reaping is
surfaced as `killed` for now; a dedicated reason can be added later
if a use case appears.
Connects to the daemon's events socket and prints each JSON line to
stdout, flushing per line so the stream is pipeline-friendly:

  shpool events | jq 'select(.type == "session.removed")'

The first line is a snapshot of the current session table; subsequent
lines are deltas. Reconnect to force a fresh snapshot.
Three end-to-end tests, exercising the full daemon → events socket →
JSON wire path:

  * snapshot_then_lifecycle: snapshot, then session.created /
    .attached / .detached / .removed{killed} as a session is created,
    detached (via background mode), and killed.
  * snapshot_includes_existing_sessions: a subscriber that connects
    after a session already exists receives that session in the
    snapshot.
  * multiple_subscribers_each_get_independent_streams: two concurrent
    subscribers both receive the snapshot and full delta sequence.
- Gate the SessionRemoved{Exited} publish in handle_attach's shell-exit
  branch on shells.remove() actually returning Some, so a concurrent
  kill or reaper that already removed the entry doesn't produce a
  duplicate removal event.

- Drop the eager SessionDetached publish in handle_detach. The
  bidi-loop unwind path in handle_attach already publishes the matching
  event with its own timestamp; emitting it twice was observable to
  subscribers. Keep the eager last_disconnected_at write so concurrent
  list() callers still see fresh state immediately.

- In select_shell_desc, defer the reattach SessionAttached publish past
  the is_finished() check so it isn't emitted for a session about to be
  implicitly clobbered by the create path. Have the create path publish
  SessionRemoved{Exited} when it overwrites an existing entry, so the
  replacement is explicit on the wire.

- Thread the events socket path through signals::Handler so signal
  exits clean it up alongside the main socket. Switch to a Vec<PathBuf>
  to handle both. Tolerate NotFound on cleanup since the events socket
  may not have been bound yet when a signal arrives.

- Drop the unnecessary `pub` on Server.events_bus.

- Apply nightly rustfmt.

Three new integration tests:

- explicit_detach_publishes_one_event: pins the no-duplicate-detached
  invariant by using a kill as a known-next-event fence; a duplicate
  detached would surface as the next read instead of session.removed.
- signal_exit_unlinks_sockets: SIGTERM the daemon and assert both
  socket files are gone.
- reattach_emits_attached_only: regression guard for the reattach path.
Each event now serializes as `{"type":"session.<x>"}` with no other
fields. To learn what the event refers to, subscribers follow up with
`shpool list` (or send `ConnectHeader::List` over the main socket).

- Drop the welcome `snapshot` event; subscribers do their own bootstrap
  list call after connecting.
- Drop `name`, timestamps, and `reason` from the lifecycle events.
- Remove `RemovedReason`; reaped/killed/exited share `session.removed`.
- Inline `collect_sessions` back into `handle_list` (the snapshot was
  its only other consumer) and drop the now-unused `unix_ms` helper.
Cover the sibling-socket transport, the four event types, the JSONL
wire format, the `shpool events` CLI helper plus direct-socket use for
heavier-duty consumers, the ordering guarantee (publish under the
session-table lock so wire-order matches causal-order), and the
slow-subscriber drop policy.
@GeoffChurch GeoffChurch force-pushed the feat/events-socket branch from e227182 to 59554cd Compare May 8, 2026 02:17
- Add .context() on the signal-handler spawn and events-listener
  startup paths in daemon::run.
- Move the ttl-reaper thread::spawn's capture-clones (shells,
  events_bus) inside a block expression. The cloned Arcs shadow
  the outer names rather than introducing parallel shells_tab /
  reaper_bus bindings in the function scope.
- Reword the comment on the shells-remove + SessionRemoved publish to
  explain why the publish is gated on is_some(): a concurrent kill or
  reaper may have removed the entry (and published) while we were
  waiting for the lock.
- Have start_listener take Arc<EventBus> directly instead of a closure
  that registers + spawns a writer. EventBus::register and spawn_writer
  drop their pub modifiers now that the only call site is within
  events.rs.
- Restore the let-binding type annotation on the collected sessions
  Result in handle_list.
- Document JSONL newline handling in EVENTS.md.
- bus_publish_with_many_subscribers_is_not_quadratic: bound a single
  publish to 10K subscribers under 50 ms (~4 ms on a 2.1 GHz CPU;
  50 ms absorbs CI tail latency while still catching quadratic
  regressions, which would be on the order of seconds at N=10K).
- bus_drops_slow_subscriber_on_overflow_without_affecting_fast: a
  fast subscriber keeps receiving every event while a slow one's
  queue fills and is eventually dropped on overflow. Asserts
  SUBSCRIBER_QUEUE_DEPTH stays small since the test scales with it.
- accept_loop_registers_concurrent_subscribers: dial 20 connections
  in parallel and confirm all are registered and each receives a
  published event.
- bus_concurrent_publish_under_outer_lock_delivers_all_events:
  publish from multiple threads while each holds an outer mutex; all
  events arrive in order.
… loop

Move all subscriber I/O onto one events-sink thread driven by poll(2).
publish() becomes O(1) on the daemon's hot path: a try_send on a bounded
mpsc channel + a 1-byte write to a self-pipe to wake the sink. The sink
owns the listener, accepts connections, and drives non-blocking writes
per subscriber via a VecDeque pending-queue plus a front-line offset for
partial writes. A subscriber that falls SUBSCRIBER_QUEUE_DEPTH events
behind is marked dropped and removed at end-of-iteration without
affecting the others. publish takes no internal lock, so it is safe
under arbitrary outer locks; publishing inside the lock that protects
the state being announced preserves wire-order = causal-order.

Sink loop:
- Drain pending accepts unconditionally each iteration; the listener
  stays in the poll set so its POLLIN wakes us, but we don't trust
  the revent as a gate (listener POLLIN can lag behind connect(2)
  returning under load, leaving a queued connection unaccepted while
  the same iteration's broadcast lands on the pre-connect sub set).
- Treat any wake-fd revent as "drain it"; POLLHUP-on-EOF no longer
  busy-loops poll(2).
- Skip already-dropped subs in the POLLOUT-driven drive pass.
- panic! on non-EINTR poll(2) errors and non-EAGAIN/EINTR wake-fd
  read errors (programmer-error categories on fds the sink owns).

Publish:
- Pre-cache each Event variant's wire form as Arc<str> via LazyLock;
  the hot path is one try_send + 1-byte wake + one Arc::clone, no
  per-publish allocation.
- Log "events sink died" once via AtomicBool when try_send returns
  Disconnected.
- EventBus::new returns io::Result so pipe2(2) failure propagates
  through Server::new instead of panicking.

Pipe creation:
- pipe2(O_NONBLOCK | O_CLOEXEC) instead of pipe + 4 fcntl calls.

Tests:
- harness(), connect_registered, connect_n_registered, read_n_lines:
  per-test scaffolding around tempdir + bus + listener + round-trip
  registration probe. The probe replaces the subscriber_count hook
  the bus previously exposed for tests; the field, method, and
  per-iteration atomic store are gone.
- Slow-subscriber drop test uses SO_RCVBUF on the slow client
  (SO_SNDBUF on the listener does not reliably inherit to AF_UNIX
  accepted sockets).
- New: events_arrive_in_publish_order, drive_pending property fuzz,
  subscriber_writer_resumes_after_peer_drains, sink-level
  slow_subscriber_drop_does_not_affect_fast_through_sink.
- Removed sink_thread_is_named_events_sink (didn't observe the
  actual sink) and bus_publish_reaches_every_subscriber (subsumed
  by accept_loop_registers_concurrent_subscribers).

Test support:
- Wait for sibling events.socket to exist alongside the main socket
  before declaring the daemon ready.
@GeoffChurch GeoffChurch force-pushed the feat/events-socket branch from 59554cd to 8f5e08c Compare May 8, 2026 03:06
@GeoffChurch
Copy link
Copy Markdown
Author

@ethanpailes should be ready now

@ethanpailes
Copy link
Copy Markdown
Contributor

Thanks! I'll try to give this a look soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants