//! Replay: reads a session from the archive and dispatches synthetic //! `BusEvent`s with `EventOrigin::Replay` directly to a target stream, //! bypassing the `SharingBus`. //! //! Consumed by `CoreRuntime::replay_session_to_stream` (task 16). This //! module intentionally exposes a free function that takes //! `&Archivist`, `scroll_id`, `Arc`, and `ReplayOptions` //! so it can be unit-tested without a full runtime. use std::sync::Arc; use std::time::Duration; use uuid::Uuid; use dirigent_archivist::coordinator::Archivist; use dirigent_archivist::error::ArchivistError; use dirigent_archivist::types::MessageRecord; use dirigent_protocol::{ Event, Message, MessagePart, MessageRole, MessageStatus, streaming::{BusEvent, EventOrigin, EventRouting, SessionStream, StreamOutcome}, }; /// Options controlling a replay pass. #[derive(Debug, Clone)] pub struct ReplayOptions { /// When true and the session is an AcpConnection, meta-events are read from /// the archive (currently only counted — rendering meta events as /// `BusEvent`s is out of scope for Phase 4). pub include_meta_events: bool, /// Pace events in real time (sleep between consecutive timestamps) or emit /// as fast as the target stream can consume. pub speed: ReplaySpeed, } impl Default for ReplayOptions { fn default() -> Self { Self { include_meta_events: false, speed: ReplaySpeed::AsFastAsPossible, } } } /// Controls inter-event pacing during replay. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ReplaySpeed { /// Sleep the wall-clock delta between consecutive message timestamps. Realtime, /// Emit events as fast as the stream can consume. AsFastAsPossible, } /// Outcome of a replay pass. #[derive(Debug, Default, Clone)] pub struct ReplayReport { /// Total events dispatched to the stream (includes failed attempts). pub events_sent: usize, /// Events the stream rejected (`StreamOutcome::Failed`). pub failures: usize, /// Wall-clock duration of the replay in milliseconds. pub duration_ms: u64, } /// Errors raised by `replay_session_to_stream` itself. Stream-side failures are /// counted in `ReplayReport::failures` rather than propagated, so one bad event /// doesn't abort the replay. #[derive(Debug, thiserror::Error)] pub enum ReplayError { /// The archive has no session with the given scroll id. #[error("session not found: {0}")] SessionNotFound(Uuid), /// Archivist returned a non-SessionUnknown error (I/O, decoding, etc). #[error("archivist: {0}")] Archivist(String), } /// Replay a session's archived messages to a single `SessionStream`. /// /// Reads metadata + messages from `archivist`, synthesises a `BusEvent` per /// message with `EventOrigin::Replay { replay_id }`, and dispatches directly /// to the target stream. The `SharingBus` is not involved; live events remain /// unaffected. /// /// The function continues on stream failures and records the count in the /// returned `ReplayReport`; only unrecoverable archive errors propagate. pub async fn replay_session_to_stream( archivist: &Archivist, scroll_id: Uuid, stream: Arc, opts: ReplayOptions, ) -> Result { let start = std::time::Instant::now(); let replay_id = Uuid::new_v4(); // Load metadata. Translate the archivist's typed `SessionUnknown` into // the replay-level `SessionNotFound` variant; everything else becomes // `Archivist(_)` so callers can distinguish "missing" from "broken". let metadata = archivist .get_session_metadata(scroll_id, None) .await .map_err(|e| match e { ArchivistError::SessionUnknown(id) => ReplayError::SessionNotFound(id), other => ReplayError::Archivist(other.to_string()), })?; let messages = archivist .get_messages(scroll_id, None) .await .map_err(|e| ReplayError::Archivist(e.to_string()))?; let connector_uid = Some(metadata.connector_uid); let native_session_id = metadata.native_session_id.clone(); // We do not persist the orchestrator-side `connector_id` string in session // metadata; the native session id is the best reversible handle we have. let connector_id = native_session_id.clone().unwrap_or_default(); let mut events_sent = 0usize; let mut failures = 0usize; let mut prev_ts: Option> = None; for record in messages { if matches!(opts.speed, ReplaySpeed::Realtime) { if let Some(prev) = prev_ts { let delta = record.ts.signed_duration_since(prev); if let Ok(d) = delta.to_std() { // Cap per-step sleep at 1h to avoid pathological archives // where a session sat idle for days. if d > Duration::from_millis(0) && d < Duration::from_secs(3600) { tokio::time::sleep(d).await; } } } prev_ts = Some(record.ts); } let message = message_from_record(&record, native_session_id.as_deref()); let event = Event::MessageCompleted { connector_id: connector_id.clone(), message, }; let mut routing = EventRouting::derive(&event, connector_uid, &connector_id); // `derive()` leaves scroll_id=None (the bus cache normally fills it in). // During replay we have the authoritative scroll_id up front. routing.scroll_id = Some(scroll_id); let bus_event = BusEvent { routing, origin: EventOrigin::Replay { replay_id }, event: Arc::new(event), }; match stream.on_event(&bus_event).await { StreamOutcome::Ok | StreamOutcome::Skipped => { events_sent += 1; } StreamOutcome::Failed(_err) => { failures += 1; events_sent += 1; // count attempted regardless } } } if opts.include_meta_events { // Meta-events exist only on AcpConnection sessions; the read is // cheap and idempotent, so we don't gate on `metadata.kind`. Render- // as-BusEvent is out of scope for Phase 4 — we just probe the // archive so missing meta-event storage surfaces as a log line // here rather than later in the call chain. let _ = archivist.get_meta_events(scroll_id, None).await; } Ok(ReplayReport { events_sent, failures, duration_ms: start.elapsed().as_millis() as u64, }) } /// Synthesize a protocol `Message` from an archived `MessageRecord`. /// /// The session_id we emit is the connector's native session id when known, /// falling back to the stringified scroll_id so downstream routing at least /// has a stable handle. fn message_from_record(record: &MessageRecord, native_session_id: Option<&str>) -> Message { Message { id: record.message_id.to_string(), session_id: native_session_id .map(str::to_string) .unwrap_or_else(|| record.session.to_string()), role: parse_role(&record.role), created_at: record.ts, content: content_parts_from_record(record), status: MessageStatus::Completed, metadata: None, } } /// Parse the archivist's stringly-typed role into the protocol enum. /// /// `MessageRole` only has `User` and `Assistant` today; archived "system" / /// "tool" rows (which the protocol layer does not support) fall back to /// `User` rather than drop the message entirely. Lossy but preserves content. fn parse_role(role: &str) -> MessageRole { match role { "assistant" => MessageRole::Assistant, "user" => MessageRole::User, // Protocol has no System/Tool variant; surface these as user messages // so their content still reaches the stream. _ => MessageRole::User, } } /// Prefer the archived structured `content_parts` (round-trips tool calls, /// code blocks, etc). Fall back to a single `Text` part built from the /// markdown rendering when parts are missing or fail to parse. fn content_parts_from_record(record: &MessageRecord) -> Vec { if let Some(parts) = &record.content_parts { if let Ok(parsed) = serde_json::from_value::>(parts.clone()) { return parsed; } } vec![MessagePart::Text { text: record.content_md.clone(), }] }