Session Pool (session_pool.rs)
File: src/daemon/session_pool.rs
Manages CLI sessions using a per-message spawn architecture. Each user message spawns a fresh CLI subprocess via the appropriate CliAdapter, maintaining conversation continuity via session resume flags (--resume for Claude, equivalent for other CLIs).
Purpose
- Maintain a registry of session metadata (path, mode, CLI type, status, SDK session ID)
- Spawn CLI subprocesses for individual messages via the
CliAdaptertrait - Convert CLI stdout JSON-lines to
StreamEventvalues - Handle message queuing when the CLI is busy
- Manage process lifecycle (spawn, monitor, interrupt, kill)
- Track client connection state for response buffering
Architecture: Per-Message Spawn
Rather than keeping a long-running CLI process with stdin open, the SessionPool spawns a fresh process for each message:
# First message (no session ID yet):
claude -p "user message" --output-format stream-json --verbose \
[--dangerously-skip-permissions]
# Subsequent messages (using --resume to continue conversation):
claude -p "user message" --output-format stream-json --verbose \
--resume <sdkSessionId> \
[--dangerously-skip-permissions]
The CliAdapter trait abstracts this pattern across all supported CLIs. Each adapter implements build_command() for the first message and build_resume_command() for subsequent messages.
A fresh adapter instance is created for each run_cli_process() call via create_adapter(). This ensures any per-run state (such as cumulative text tracking) is reset cleanly between message turns.
Internal Session State
#![allow(unused)] fn main() { struct InternalSession { session_id: String, path: String, mode: PermissionMode, cli_type: String, status: SessionStatus, sdk_session_id: Option<String>, created_at: DateTime<Utc>, last_activity_at: DateTime<Utc>, process: Option<Child>, // Running CLI process (only during processing) queue: MessageQueue, // Per-session message + response queue processing: bool, // Whether a message is currently being processed model: Option<String>, // Model name reported by CLI init event } }
The sessions map is wrapped in Arc<Mutex<HashMap<String, InternalSession>>> for async-safe access.
Key Methods
create(path, mode, model, cli_type) -> Result<String, String>
Creates a new session entry. Lightweight — no CLI process is spawned.
- Resolves the path: expands
~, and expands bare project names to~/Projects/<name> - Validates that the resolved path exists on the filesystem
- Generates a UUID for the session ID
- Inserts an
InternalSessionwithstatus: Idle, no process, and a freshMessageQueue - Returns the session ID
send(session_id, message) -> mpsc::Receiver<StreamEvent>
Sends a message to a session. Returns a channel receiver that yields stream events.
If the session is busy (another message in flight):
- Enqueues the message via
queue.enqueue_user() - Sends a single
Queued { position }event on the receiver - Returns immediately
If the session is idle:
- Sets
status: Busyandprocessing: true - Spawns a tokio task that calls
run_cli_process() - The task forwards events onto the mpsc channel
run_cli_process(session_id, message) -> impl Stream<Item = StreamEvent>
Internal method that spawns the CLI subprocess and yields events.
Adapter selection and command building:
#![allow(unused)] fn main() { let adapter = create_adapter(&session.cli_type); let command = if let Some(sdk_id) = &session.sdk_session_id { adapter.build_resume_command(message, mode, cwd, sdk_id, model) } else { adapter.build_command(message, mode, cwd, model) }; }
Process spawn:
#![allow(unused)] fn main() { let mut child = command .current_dir(&session.path) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; }
Stdin is set to null because the prompt is passed via CLI arguments in non-interactive mode.
Output processing:
stdout is read line-by-line using tokio::io::BufReader and AsyncBufReadExt::lines(). Each line is passed to adapter.parse_output_line(), which returns zero or more StreamEvent values. Events are forwarded onto the mpsc channel.
stderr is logged at the level specified by adapter.stderr_log_level() (typically debug for Claude, warn for others).
Session ID extraction:
On the first line of output, adapter.extract_session_id() is called. If an ID is found, it is stored as session.sdk_session_id for future build_resume_command() calls.
Model name capture:
System { subtype: Some("init"), model, .. } events are used to update session.model.
Terminal events:
The generator stops forwarding events after receiving a Result, Error, or Interrupted event. The subprocess is awaited and then cleaned up.
Cleanup after process exit:
session.processis set toNonesession.processingis set tofalsesession.statusis set toIdle- If the process exited with a non-zero code, an
Errorevent is emitted - If there are queued user messages,
process_queued_message()is called to auto-process the next one
resume(session_id, sdk_session_id?) -> Result<ResumeResult, String>
In per-message spawn mode, this simply updates sdk_session_id so the next send() uses the resume command. Also calls queue.on_client_reconnect() to mark the client as reconnected.
Returns { ok: true, fallback: false }.
destroy(session_id) -> bool
- Sends SIGTERM to any running CLI process
- Waits up to 5 seconds for the process to exit; sends SIGKILL if it does not
- Sets
status: Destroyed - Clears the message queue
- Removes the session from the pool
set_mode(session_id, mode) -> bool
Updates session.mode. Takes effect on the next send() call since the mode is passed to adapter.build_command().
set_model(session_id, model) -> bool
Updates session.model. Takes effect on the next send() call.
interrupt(session_id) -> Result<bool, String>
- Sends SIGTERM to the running CLI process
- Clears the message queue (cancels any pending messages)
- Returns
trueif there was an active process to interrupt,falseif the session was idle
list_sessions() -> Vec<SessionInfo>
Returns SessionInfo for all non-destroyed sessions. SessionInfo is a serializable snapshot (dates as ISO 8601 strings, no runtime-only fields).
client_disconnect(session_id) / client_reconnect(session_id) -> Vec<StreamEvent>
Proxy methods for MessageQueue client state management. Called by server.rs when the SSE connection drops or is re-established.
get_queue_stats(session_id) -> QueueStats
Returns QueueStats { user_pending, response_pending, client_connected } for a session.
destroy_all()
Destroys all sessions. Called during daemon shutdown.
Connection to Other Modules
- server.rs creates a single
SessionPoolinstance (insideAppState) and calls its methods for all session-related RPC handlers - Uses cli_adapter via
create_adapter()to build and parse CLI subprocesses - Uses MessageQueue for per-session message buffering and client state tracking
- Imports types from types.rs (
SessionStatus,PermissionMode,StreamEvent,SessionInfo,QueueStats)