//! Owns all active streams. //! //! Populated at boot from `[[streams]]` config and at runtime via //! [`StreamRegistry::attach`]. Each attached stream gets: //! //! - a bus subscription with an [`EventFilter`] derived from its scope, //! - a dedicated worker task that drives `SessionStream::on_event`, //! - a per-stream [`HealthStatus`] that drifts on consecutive failures //! (see [`super::health`]). //! //! The worker is cancellable via a one-shot `mpsc::Sender<()>` on the //! registration so [`detach`](StreamRegistry::detach) can stop delivery //! deterministically before invoking `SessionStream::shutdown`. use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; use tokio::sync::{RwLock, mpsc}; use tokio::task::JoinHandle; use tracing::warn; use uuid::Uuid; use dirigent_protocol::streaming::{ BusReceiver, EventFilter, SessionStream, StreamOutcome, StreamScope, StreamSummary, }; use super::bus::SharingBus; use super::health::{HealthStatus, record_failure, record_success}; /// Per-subscriber queue capacity for a stream's bus subscription. Matches /// the default used by `SharingBus::subscribe_all`. const STREAM_QUEUE_CAPACITY: usize = 256; /// Identifier of a registered stream. Opaque wrapper around a `Uuid` so /// that callers can't confuse stream ids with scroll/connector ids. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct StreamId(pub Uuid); /// Full registration record for a live stream. /// /// Held inside the registry behind an `Arc`; `detach` returns the Arc so /// callers can inspect the final health state or await the worker handle /// if they wish. Most fields are `Arc`s so the worker task and the /// registry share state without serialising on a single lock. pub struct StreamRegistration { pub id: StreamId, pub name: String, pub stream: Arc, pub scope: StreamScope, pub enabled: bool, pub health: Arc>, /// Number of consecutive delivery failures; drives the K=5 drift to /// `Unavailable`. Stored atomic so the worker can update without /// taking the health lock on every success. pub consecutive_failures: Arc, pub worker: JoinHandle<()>, pub stop_tx: mpsc::Sender<()>, } /// Snapshot view of a registered stream. Returned by /// [`StreamRegistry::list`] for telemetry / UI. #[derive(Debug, Clone)] pub struct StreamInfo { pub id: StreamId, pub name: String, pub summary: StreamSummary, pub scope: StreamScope, pub enabled: bool, pub health: HealthStatus, /// Current consecutive-failure count (mirrors /// `StreamRegistration::consecutive_failures` at read time). pub lagged_count: u64, } /// The live registry of all streams wired to a [`SharingBus`]. pub struct StreamRegistry { bus: Arc, regs: RwLock>>, } impl StreamRegistry { /// Build an empty registry bound to `bus`. pub fn new(bus: Arc) -> Self { Self { bus, regs: RwLock::new(Vec::new()), } } /// Attach a running stream. /// /// Subscribes to the bus with a filter derived from `stream.scope()`, /// spawns a worker task that ferries events into `on_event`, and /// stores a registration with fresh health state. pub async fn attach(&self, name: String, stream: Arc) -> StreamId { let id = StreamId(Uuid::now_v7()); let scope = stream.scope(); let filter = scope_to_filter(&scope); let bus_rx = self .bus .subscribe_filtered(filter, STREAM_QUEUE_CAPACITY) .await; let (stop_tx, stop_rx) = mpsc::channel(1); let health = Arc::new(RwLock::new(HealthStatus::Healthy)); let failures = Arc::new(AtomicU32::new(0)); let stream_for_worker = Arc::clone(&stream); let health_for_worker = Arc::clone(&health); let failures_for_worker = Arc::clone(&failures); let name_for_worker = name.clone(); let worker = tokio::spawn(run_stream_worker( name_for_worker, bus_rx, stream_for_worker, health_for_worker, failures_for_worker, stop_rx, )); let reg = Arc::new(StreamRegistration { id, name, stream, scope, enabled: true, health, consecutive_failures: failures, worker, stop_tx, }); self.regs.write().await.push(reg); id } /// Detach a stream. Signals the worker to exit, then invokes /// `SessionStream::shutdown`. Returns the registration if the stream /// was found, or `None` if the id was already detached. pub async fn detach(&self, id: StreamId) -> Option> { let mut regs = self.regs.write().await; let idx = regs.iter().position(|r| r.id == id)?; let reg = regs.remove(idx); drop(regs); // Best-effort stop: if the channel is already closed (worker panicked) // we still want to run shutdown. let _ = reg.stop_tx.send(()).await; reg.stream.shutdown().await; Some(reg) } /// Look up a live stream by id. pub async fn get_stream(&self, id: StreamId) -> Option> { self.regs .read() .await .iter() .find(|r| r.id == id) .map(|r| Arc::clone(&r.stream)) } /// Snapshot every registered stream. Clones the underlying health /// value so the returned `Vec` is safe to hand across async tasks /// without holding any locks. pub async fn list(&self) -> Vec { let regs = self.regs.read().await; let mut out = Vec::with_capacity(regs.len()); for r in regs.iter() { let health = r.health.read().await.clone(); out.push(StreamInfo { id: r.id, name: r.name.clone(), summary: r.stream.summary(), scope: r.scope.clone(), enabled: r.enabled, health, lagged_count: r.consecutive_failures.load(Ordering::Relaxed) as u64, }); } out } } /// Translate a declarative [`StreamScope`] into the subscriber-side /// [`EventFilter`] applied on the bus. fn scope_to_filter(scope: &StreamScope) -> EventFilter { match scope { StreamScope::Session { scroll_id } => EventFilter::ScrollId(*scroll_id), StreamScope::Connector { connector_uid } => EventFilter::ConnectorUid(*connector_uid), StreamScope::ArchiveWide { .. } => EventFilter::All, } } /// Worker loop: pulls events from the bus subscription, forwards them to /// the stream, and updates health state on every outcome. async fn run_stream_worker( name: String, mut rx: BusReceiver, stream: Arc, health: Arc>, failures: Arc, mut stop_rx: mpsc::Receiver<()>, ) { loop { tokio::select! { biased; _ = stop_rx.recv() => { return; } maybe_evt = rx.rx.recv() => { let Some(evt) = maybe_evt else { // Bus hung up — registry should detach but we can exit // here regardless. return; }; match stream.on_event(&evt).await { StreamOutcome::Ok | StreamOutcome::Skipped => { let mut h = health.write().await; let mut counter = failures.load(Ordering::Relaxed); record_success(&mut h, &mut counter); failures.store(counter, Ordering::Relaxed); } StreamOutcome::Failed(err) => { let reason = err.to_string(); warn!(stream = %name, error = %reason, "stream rejected event"); let mut h = health.write().await; let mut counter = failures.load(Ordering::Relaxed); record_failure(&mut h, &mut counter, reason); failures.store(counter, Ordering::Relaxed); } } } } } }