Message Queue (message_queue.rs)
File: src/daemon/message_queue.rs
Per-session message queue with three responsibilities: buffering user messages when the CLI is busy, buffering response events when the SSH connection drops, and tracking client connection state.
Purpose
- User message buffering: When Claude is processing a message, additional user messages are queued and processed in order after the current message completes.
- Response buffering: When the SSH connection (and thus the SSE stream) drops mid-response, events are buffered and can be replayed when the client reconnects via
session.reconnect. - Client connection tracking: Tracks whether the Head Node client is currently connected so the system knows whether to buffer response events.
Struct: MessageQueue
#![allow(unused)] fn main() { pub struct MessageQueue { user_pending: VecDeque<QueuedUserMessage>, response_pending: VecDeque<QueuedResponse>, client_connected: bool, // Starts as true } }
Each session has its own MessageQueue instance, created when the session is created in SessionPool::create().
Data Types
QueuedUserMessage
#![allow(unused)] fn main() { pub struct QueuedUserMessage { pub message: String, pub timestamp: u64, // Unix milliseconds } }
QueuedResponse (private)
#![allow(unused)] fn main() { struct QueuedResponse { event: StreamEvent, timestamp: u64, // Unix milliseconds } }
User Message Buffering
enqueue_user(message: String) -> usize
Adds a user message to the back of the queue. Returns the new queue length (used as the queue position in the Queued event sent back to the user). Called by SessionPool::send() when the session is busy.
dequeue_user() -> Option<QueuedUserMessage>
Removes and returns the next user message from the front of the queue. Returns None if empty. Called by SessionPool after completing a message to auto-process the next one.
has_user_pending() -> bool
Returns true if there are queued user messages. Used by SessionPool to decide whether to call process_queued_message() after a message completes.
Response Buffering
buffer_response(event: StreamEvent, force: bool)
Buffers a response event. When force is false (normal path), events are only buffered when client_connected is false. When force is true, the event is always buffered — used by server.rs when it detects an SSE client disconnect before the session pool has been notified.
on_client_reconnect() -> Vec<StreamEvent>
Marks the client as reconnected and returns all buffered response events (draining the queue). Called by the session.reconnect RPC handler. The caller sends the returned events back to the newly reconnected client.
Client Connection State
is_client_connected() -> bool
Returns the current connection state.
on_client_disconnect()
Sets client_connected = false. After this call, response events will be buffered via buffer_response() rather than discarded.
Cleanup
clear()
Clears both the user message queue and the response buffer. Called when a session is destroyed or interrupted.
stats() -> QueueStats
Returns a QueueStats snapshot for monitoring:
#![allow(unused)] fn main() { pub struct QueueStats { pub user_pending: usize, pub response_pending: usize, pub client_connected: bool, } }
Used by the /status and /monitor commands to report queue depth.
Flow Example
User sends msg1 → SessionPool: start processing msg1
User sends msg2 → queue.enqueue_user("msg2") → returns position 1
SessionPool: yields Queued { position: 1 } to client
(msg2 sits in queue while msg1 processes)
User sends msg3 → queue.enqueue_user("msg3") → returns position 2
SessionPool: yields Queued { position: 2 } to client
SSH drops → server.rs detects SSE close event
queue.on_client_disconnect()
(subsequent response events buffered via buffer_response(event, false))
msg1 completes → SessionPool: dequeue_user() → returns msg2
SessionPool: start processing msg2 (events buffered since disconnected)
msg2 completes → SessionPool: dequeue_user() → returns msg3
SessionPool: start processing msg3
SSH reconnects → client sends session.reconnect RPC
queue.on_client_reconnect() → returns buffered events for msg2
server.rs sends buffered events to client
client is now back in sync; msg3 is still processing
Connection to Other Modules
- session_pool.rs creates one
MessageQueueper session and calls its methods for message queuing and client state management - server.rs calls
queue.on_client_disconnect()when the SSE connection closes, and uses thesession.reconnecthandler to callqueue.on_client_reconnect() - Imports
StreamEventandQueueStatsfrom types.rs