From 582954667108ac850934420ef82a6372c92b75c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabor=20K=C3=B6rber?= Date: Sat, 9 May 2026 21:59:28 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=A5=87=20export=20from=20upstream=20(a1fa?= =?UTF-8?q?8e3a)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 4 - README.md | 23 +- architecture.svg | 60 +- core-architecture.svg | 118 ++++ crates/dirigent_core/Cargo.toml | 4 - .../src/connectors/acp/connector.rs | 64 +-- .../src/connectors/acp/transport/stdio.rs | 6 +- .../src/connectors/gateway/mod.rs | 28 +- crates/dirigent_core/src/hooks.rs | 4 +- crates/dirigent_core/src/lib.rs | 4 + crates/dirigent_core/src/traits/inspector.rs | 38 ++ crates/dirigent_core/src/traits/mod.rs | 5 + crates/dirigent_core/src/traits/process.rs | 62 ++ crates/dirigent_inspector/Cargo.toml | 36 -- crates/dirigent_inspector/src/channel.rs | 349 ----------- crates/dirigent_inspector/src/error.rs | 34 -- crates/dirigent_inspector/src/handle.rs | 262 --------- crates/dirigent_inspector/src/lib.rs | 23 - crates/dirigent_inspector/src/node.rs | 109 ---- crates/dirigent_inspector/src/process.rs | 381 ------------ crates/dirigent_inspector/src/registry.rs | 459 --------------- crates/dirigent_inspector/src/snapshot.rs | 154 ----- crates/dirigent_inspector/src/system.rs | 231 -------- crates/dirigent_inspector/src/tree.rs | 544 ------------------ .../dirigent_inspector/tests/integration.rs | 355 ------------ crates/dirigent_process/Cargo.toml | 32 -- crates/dirigent_process/src/lib.rs | 30 - crates/dirigent_process/src/linux.rs | 91 --- crates/dirigent_process/src/shutdown.rs | 66 --- crates/dirigent_process/src/traits.rs | 40 -- crates/dirigent_process/src/unix.rs | 78 --- crates/dirigent_process/src/windows.rs | 199 ------- crates/dirigent_process/tests/lifecycle.rs | 149 ----- 33 files changed, 323 insertions(+), 3719 deletions(-) create mode 100644 core-architecture.svg create mode 100644 crates/dirigent_core/src/traits/inspector.rs create mode 100644 crates/dirigent_core/src/traits/mod.rs create mode 100644 crates/dirigent_core/src/traits/process.rs delete mode 100644 crates/dirigent_inspector/Cargo.toml delete mode 100644 crates/dirigent_inspector/src/channel.rs delete mode 100644 crates/dirigent_inspector/src/error.rs delete mode 100644 crates/dirigent_inspector/src/handle.rs delete mode 100644 crates/dirigent_inspector/src/lib.rs delete mode 100644 crates/dirigent_inspector/src/node.rs delete mode 100644 crates/dirigent_inspector/src/process.rs delete mode 100644 crates/dirigent_inspector/src/registry.rs delete mode 100644 crates/dirigent_inspector/src/snapshot.rs delete mode 100644 crates/dirigent_inspector/src/system.rs delete mode 100644 crates/dirigent_inspector/src/tree.rs delete mode 100644 crates/dirigent_inspector/tests/integration.rs delete mode 100644 crates/dirigent_process/Cargo.toml delete mode 100644 crates/dirigent_process/src/lib.rs delete mode 100644 crates/dirigent_process/src/linux.rs delete mode 100644 crates/dirigent_process/src/shutdown.rs delete mode 100644 crates/dirigent_process/src/traits.rs delete mode 100644 crates/dirigent_process/src/unix.rs delete mode 100644 crates/dirigent_process/src/windows.rs delete mode 100644 crates/dirigent_process/tests/lifecycle.rs diff --git a/Cargo.toml b/Cargo.toml index 2e48c65..455c8fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,6 @@ members = [ "crates/dirigent_auth", "crates/dirigent_config", "crates/dirigent_acp_api", - "crates/dirigent_inspector", - "crates/dirigent_process", "crates/opencode_client", ] @@ -26,6 +24,4 @@ dirigent_tools = { path = "crates/dirigent_tools" } dirigent_auth = { path = "crates/dirigent_auth" } dirigent_config = { path = "crates/dirigent_config" } dirigent_acp_api = { path = "crates/dirigent_acp_api" } -dirigent_inspector = { path = "crates/dirigent_inspector" } -dirigent_process = { path = "crates/dirigent_process" } opencode_client = { path = "crates/opencode_client" } diff --git a/README.md b/README.md index 2c97445..3804abf 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,25 @@ These tools are developed in this monorepo but distributed as independent reposi **Layers top-to-bottom:** - **Consumers** *(shadow)* — server assembly, web app, integrations — not in this repo - **Standalone Tools** — installable from their own repositories; depend on these crates -- **Orchestration** — connector runtime, ACP server, introspection -- **Foundation** — protocol types, tool sandbox, configuration, auth, process management +- **Orchestration** — connector runtime, ACP server +- **Foundation** — protocol types, tool sandbox, configuration, auth + +--- + +## Core Runtime + +`dirigent_core` is the central crate. It manages long-lived connections to external agent systems through a **Connector** abstraction — each connector wraps a bidirectional communication channel to an agent (Claude Code over stdio, OpenCode.ai over HTTP+SSE, or an incoming ACP connection). + +

+ dirigent_core internal architecture +

+ +**Key concepts:** + +- **CoreRuntime** is a stateless orchestrator. It owns the connector registry, the event bus, and lifecycle hooks — but never caches session state. The external agent is always authoritative. +- **Connectors** implement a common trait (`command_tx()` to send commands, `subscribe()` to receive events) and run as independent background tasks. Four implementations ship today: ACP (stdio/HTTP JSON-RPC), Gateway (local echo + session transfer), OpenCode (REST+SSE), and AcpAcceptor (incoming connections). +- **SharingBus** is the event backbone. Every connector event is published once and fan-out to filtered subscribers — archivist, web UI, stream integrations — without the bus knowing about any of them. +- **Lifecycle Hooks** let the server assembly inject services (inspector, process manager, archivist) at connector creation time. Core defines abstract traits (`ConnectorInspector`, `ProcessGroupManager`) and never depends on their implementations directly. --- @@ -43,10 +60,8 @@ These tools are developed in this monorepo but distributed as independent reposi | `dirigent_core` | beta | Multi-connector orchestration runtime | | `dirigent_protocol` | beta | ACP protocol types — messages, events, and RPC definitions | | `dirigent_acp_api` | beta | ACP server for incoming agent connections | -| `dirigent_inspector` | concept | Runtime introspection tree | | `dirigent_config` | beta | Configuration management | | `dirigent_auth` | concept | User authorization model | -| `dirigent_process` | beta | Child process management | | `dirigent_tools` | concept | Tool sandbox and execution abstractions | | `opencode_client` | beta | OpenCode.ai HTTP client | diff --git a/architecture.svg b/architecture.svg index 8c48646..7cc6f13 100644 --- a/architecture.svg +++ b/architecture.svg @@ -37,46 +37,40 @@ ORCHESTRATION - - dirigent_core - connector runtime - - dirigent_acp_api - ACP server - - dirigent_inspector - introspection tree + + dirigent_core + connector runtime + + dirigent_acp_api + ACP server FOUNDATION - - protocol - ACP types - - tools - sandbox - - config - paths + toml - - auth - accounts - - process - lifecycle - - opencode - HTTP client + + protocol + ACP types + + tools + sandbox + + config + paths + toml + + auth + accounts + + opencode + HTTP client - - - - - + + + + + Shadow boxes = downstream consumers not included in this repository - 9 crates — minimal set for dirigate and standalone tool dependencies + 7 crates — minimal set for dirigate and standalone tool dependencies diff --git a/core-architecture.svg b/core-architecture.svg new file mode 100644 index 0000000..37e8ba9 --- /dev/null +++ b/core-architecture.svg @@ -0,0 +1,118 @@ + + + + + + + + + + + + + + dirigent_core — internal architecture + + + + CoreRuntime + stateless orchestrator — owns connectors, bus, hooks + + + + CONNECTORS + + + AcpConnector + JSON-RPC over + stdio | HTTP+SSE + + + GatewayConnector + local echo + session + transfer routing + + + OpenCodeConnector + REST + SSE + opencode.ai client + + + AcpAcceptor + incoming ACP + connections + + + all implement Connector trait: command_tx() + subscribe() + state() + + + + + + + + + + SharingBus + event multiplexer — filtered fan-out to subscribers via EventFilter + + + + + + + Events + + + + SUBSCRIBERS (downstream, not in this crate) + + + Archivist + + Web UI (SSE) + + ACP Server + + Langfuse + + Matrix + + + + + + + INJECTION TRAITS (core::traits) + + + ConnectorInspector + + + ProcessGroupManager + + + ProcessLifecycle + + injected via ConnectorLifecycleHooks + + + + LIFECYCLE HOOKS (core::hooks) + + + ConnectorLifecycleHooks + + on_connector_created(id, kind, title, owner) + on_connector_removed(id) + inspector() / process_manager() providers + + + + + + + + + Solid arrows = ownership • Dashed blue = events • Dashed green = injection • Dashed orange = callbacks + diff --git a/crates/dirigent_core/Cargo.toml b/crates/dirigent_core/Cargo.toml index b17d812..41e970f 100644 --- a/crates/dirigent_core/Cargo.toml +++ b/crates/dirigent_core/Cargo.toml @@ -31,8 +31,6 @@ dirigent_acp_api = { path = "../dirigent_acp_api", optional = true } # Workspace dependencies dirigent_config = { path = "../dirigent_config", optional = true } dirigent_auth = { path = "../dirigent_auth" } -dirigent_process = { path = "../dirigent_process", features = ["tokio"], optional = true } -dirigent_inspector = { path = "../dirigent_inspector", optional = true } dirigent_protocol = { path = "../dirigent_protocol", features = ["adapters"], optional = true } dirigent_tools = { path = "../dirigent_tools", optional = true } # SSE client for ACP transport @@ -84,7 +82,6 @@ server = [ "dep:blake3", "dep:dirigent_acp_api", "dep:dirigent_config", - "dep:dirigent_inspector", "dep:dirigent_protocol", "dep:dirigent_tools", "dep:eventsource-client", @@ -99,5 +96,4 @@ server = [ "dep:tower-http", "dep:tracing", "dep:tracing-subscriber", - "dep:dirigent_process", ] diff --git a/crates/dirigent_core/src/connectors/acp/connector.rs b/crates/dirigent_core/src/connectors/acp/connector.rs index e2dedd6..cb5648a 100644 --- a/crates/dirigent_core/src/connectors/acp/connector.rs +++ b/crates/dirigent_core/src/connectors/acp/connector.rs @@ -83,29 +83,29 @@ macro_rules! debug_log { /// Called after `upsert_session` and on session metadata changes. #[cfg(feature = "server")] async fn inspector_upsert_session( - inspector: &Arc, + inspector: &Arc, connector_id: &str, session: &SessionInfo, ) { - let sess_node_id = dirigent_inspector::NodeId::new(format!( + let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}/sessions/{}", connector_id, session.id )); let parent_id = - dirigent_inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id)); + dirigent_protocol::inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id)); let node_state = match session.status { - SessionStatus::Active => dirigent_inspector::NodeState::Running, - SessionStatus::Processing => dirigent_inspector::NodeState::Busy("Generating".to_string()), - SessionStatus::Idle => dirigent_inspector::NodeState::Idle, - SessionStatus::Ended => dirigent_inspector::NodeState::Stopped, + SessionStatus::Active => dirigent_protocol::inspector::NodeState::Running, + SessionStatus::Processing => dirigent_protocol::inspector::NodeState::Busy("Generating".to_string()), + SessionStatus::Idle => dirigent_protocol::inspector::NodeState::Idle, + SessionStatus::Ended => dirigent_protocol::inspector::NodeState::Stopped, }; let label = session.title.as_deref().unwrap_or(&session.id); // Try to register; if already exists, update instead let meta = - dirigent_inspector::NodeMetadata::new(dirigent_inspector::NodeKind::AsyncTask, label) + dirigent_protocol::inspector::NodeMetadata::new(dirigent_protocol::inspector::NodeKind::AsyncTask, label) .with_state(node_state.clone()) .with_property("session_id", serde_json::json!(&session.id)) .with_property("status", serde_json::json!(format!("{:?}", session.status))); @@ -129,11 +129,10 @@ async fn inspector_upsert_session( }; match inspector - .register(sess_node_id.clone(), &parent_id, meta, None) + .register_node(sess_node_id.clone(), &parent_id, meta) .await { - Ok(mut handle) => { - handle.detach(); + Ok(()) => { trace!(connector_id = %connector_id, session_id = %session.id, "Registered session with inspector"); } Err(_) => { @@ -165,12 +164,12 @@ async fn inspector_upsert_session( /// Update only the inspector state for a session (lightweight, no property changes). #[cfg(feature = "server")] async fn inspector_update_session_state( - inspector: &Arc, + inspector: &Arc, connector_id: &str, session_id: &str, - state: dirigent_inspector::NodeState, + state: dirigent_protocol::inspector::NodeState, ) { - let sess_node_id = dirigent_inspector::NodeId::new(format!( + let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}/sessions/{}", connector_id, session_id )); @@ -180,13 +179,13 @@ async fn inspector_update_session_state( /// Deregister all session nodes for a connector from the inspector. #[cfg(feature = "server")] async fn inspector_deregister_all_sessions( - inspector: &Arc, + inspector: &Arc, connector_id: &str, internal_state: &InternalState, ) { let sessions = internal_state.list_sessions().await; for session in sessions { - let sess_node_id = dirigent_inspector::NodeId::new(format!( + let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}/sessions/{}", connector_id, session.id )); @@ -277,7 +276,7 @@ pub struct AcpConnector { /// Optional inspector registry for PID tracking of stdio processes #[cfg(feature = "server")] - inspector: Option>, + inspector: Option>, /// Optional process group manager for lifecycle management of stdio processes. /// @@ -286,7 +285,7 @@ pub struct AcpConnector { /// tracked in the platform job object / process group, and are shut down /// gracefully on close. #[cfg(feature = "server")] - process_manager: Option>, + process_manager: Option>, } impl AcpConnector { @@ -388,7 +387,7 @@ impl AcpConnector { #[cfg(feature = "server")] pub fn with_inspector( mut self, - inspector: Option>, + inspector: Option>, ) -> Self { self.inspector = inspector; self @@ -402,7 +401,7 @@ impl AcpConnector { #[cfg(feature = "server")] pub fn with_process_manager( mut self, - process_manager: Option>, + process_manager: Option>, ) -> Self { self.process_manager = process_manager; self @@ -504,8 +503,8 @@ impl AcpConnector { pending_agent_requests: Arc>>, session_states: Arc>>, mut cmd_rx: mpsc::Receiver, - #[cfg(feature = "server")] inspector: Option>, - #[cfg(feature = "server")] process_manager: Option>, + #[cfg(feature = "server")] inspector: Option>, + #[cfg(feature = "server")] process_manager: Option>, ) { debug_log!("🚀 ACP connector {} task started", id); info!(connector_id = %id, "ACP connector task started"); @@ -690,26 +689,25 @@ impl AcpConnector { #[cfg(feature = "server")] if let Some(ref inspector) = inspector { if let Some(pid) = transport.pid().await { - let process_node_id = dirigent_inspector::NodeId::new(format!( + let process_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}/process", id )); - let parent_node_id = dirigent_inspector::NodeId::new(format!( + let parent_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}", id )); - let meta = dirigent_inspector::NodeMetadata::new( - dirigent_inspector::NodeKind::Process, + let meta = dirigent_protocol::inspector::NodeMetadata::new( + dirigent_protocol::inspector::NodeKind::Process, "stdio-process", ) - .with_state(dirigent_inspector::NodeState::Running) + .with_state(dirigent_protocol::inspector::NodeState::Running) .with_property("pid", serde_json::json!(pid)) .with_property("transport", serde_json::json!("stdio")); - if let Ok(mut handle) = inspector - .register(process_node_id, &parent_node_id, meta, None) + if let Ok(()) = inspector + .register_node(process_node_id, &parent_node_id, meta) .await { - handle.detach(); info!(connector_id = %id, pid = pid, "Registered stdio process with inspector"); } } @@ -1560,7 +1558,7 @@ impl AcpConnector { inspector, &id, &session_id, - dirigent_inspector::NodeState::Busy("Generating".to_string()), + dirigent_protocol::inspector::NodeState::Busy("Generating".to_string()), ).await; } @@ -2433,7 +2431,7 @@ impl AcpConnector { inspector, &id, session_id, - dirigent_inspector::NodeState::Idle, + dirigent_protocol::inspector::NodeState::Idle, ).await; } } @@ -2453,7 +2451,7 @@ impl AcpConnector { /// Create transport based on configuration async fn create_transport( config: &AcpConfig, - #[cfg(feature = "server")] process_manager: Option<&Arc>, + #[cfg(feature = "server")] process_manager: Option<&Arc>, ) -> AcpResult> { match &config.transport { TransportKind::Stdio { diff --git a/crates/dirigent_core/src/connectors/acp/transport/stdio.rs b/crates/dirigent_core/src/connectors/acp/transport/stdio.rs index 0be83f2..abc2adc 100644 --- a/crates/dirigent_core/src/connectors/acp/transport/stdio.rs +++ b/crates/dirigent_core/src/connectors/acp/transport/stdio.rs @@ -179,7 +179,7 @@ pub struct StdioTransport { /// force-killing the process. Without it, the original hard-kill behavior /// is preserved. #[cfg(feature = "server")] - process_lifecycle: Option>, + process_lifecycle: Option>, } impl StdioTransport { @@ -307,7 +307,7 @@ impl StdioTransport { /// /// Must be called before `connect()`. #[cfg(feature = "server")] - pub fn set_process_lifecycle(&mut self, lifecycle: Box) { + pub fn set_process_lifecycle(&mut self, lifecycle: Box) { self.process_lifecycle = Some(lifecycle); } @@ -837,7 +837,7 @@ impl AcpTransport for StdioTransport { if let Some(ref lifecycle) = self.process_lifecycle { if child.id().is_some() { // Graceful shutdown: SIGTERM/CTRL_BREAK → wait → force kill - dirigent_process::graceful_shutdown_async( + crate::traits::graceful_shutdown_async( lifecycle.as_ref(), &mut child, std::time::Duration::from_secs(5), diff --git a/crates/dirigent_core/src/connectors/gateway/mod.rs b/crates/dirigent_core/src/connectors/gateway/mod.rs index 9cc5019..875b42a 100644 --- a/crates/dirigent_core/src/connectors/gateway/mod.rs +++ b/crates/dirigent_core/src/connectors/gateway/mod.rs @@ -292,32 +292,32 @@ impl Default for GatewayConfig { /// Register a Gateway session node in the inspector registry. #[cfg(feature = "server")] async fn inspector_register_gateway_session( - inspector: &Arc, + inspector: &Arc, connector_id: &str, session: &GatewaySession, ) { - let sess_node_id = dirigent_inspector::NodeId::new(format!( + let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}/sessions/{}", connector_id, session.id )); let parent_id = - dirigent_inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id)); + dirigent_protocol::inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id)); - let meta = dirigent_inspector::NodeMetadata::new( - dirigent_inspector::NodeKind::AsyncTask, + let meta = dirigent_protocol::inspector::NodeMetadata::new( + dirigent_protocol::inspector::NodeKind::AsyncTask, &session.title, ) - .with_state(dirigent_inspector::NodeState::Running) + .with_state(dirigent_protocol::inspector::NodeState::Running) .with_property("session_id", serde_json::json!(&session.id)) .with_property("status", serde_json::json!("Active")) .with_property("message_count", serde_json::json!(session.messages.len())); match inspector - .register(sess_node_id, &parent_id, meta, None) + .register_node(sess_node_id, &parent_id, meta) .await { - Ok(mut handle) => { - handle.detach(); + Ok(()) => { + // Registered successfully } Err(_) => { // Already registered — that's fine @@ -328,12 +328,12 @@ async fn inspector_register_gateway_session( /// Deregister all Gateway session nodes from the inspector. #[cfg(feature = "server")] async fn inspector_deregister_gateway_sessions( - inspector: &Arc, + inspector: &Arc, connector_id: &str, sessions: &HashMap, ) { for session_id in sessions.keys() { - let sess_node_id = dirigent_inspector::NodeId::new(format!( + let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!( "dirigent/connectors/{}/sessions/{}", connector_id, session_id )); @@ -387,7 +387,7 @@ pub struct GatewayConnector { /// Optional inspector registry for session tracking #[cfg(feature = "server")] - inspector: Option>, + inspector: Option>, } /// Helper that publishes an event to both the per-connector broadcast @@ -471,7 +471,7 @@ impl GatewayConnector { /// Set the inspector registry for session tracking. #[cfg(feature = "server")] - pub fn set_inspector(&mut self, inspector: Option>) { + pub fn set_inspector(&mut self, inspector: Option>) { self.inspector = inspector; } @@ -553,7 +553,7 @@ impl GatewayConnector { mut cmd_rx: mpsc::Receiver, connector_list_callback: Option, session_transfer_callback: Option, - #[cfg(feature = "server")] inspector: Option>, + #[cfg(feature = "server")] inspector: Option>, ) { info!(connector_id = %id, "Gateway connector task started"); diff --git a/crates/dirigent_core/src/hooks.rs b/crates/dirigent_core/src/hooks.rs index 881eb3c..afee771 100644 --- a/crates/dirigent_core/src/hooks.rs +++ b/crates/dirigent_core/src/hooks.rs @@ -26,14 +26,14 @@ pub trait ConnectorLifecycleHooks: Send + Sync { async fn on_connector_removed(&self, _connector_id: &str) {} #[cfg(feature = "server")] - fn inspector(&self) -> Option> { + fn inspector(&self) -> Option> { None } #[cfg(feature = "server")] fn process_manager( &self, - ) -> Option> { + ) -> Option> { None } } diff --git a/crates/dirigent_core/src/lib.rs b/crates/dirigent_core/src/lib.rs index 60935f2..a7276fa 100644 --- a/crates/dirigent_core/src/lib.rs +++ b/crates/dirigent_core/src/lib.rs @@ -74,6 +74,10 @@ pub mod connectors; #[cfg(feature = "server")] pub mod vendors; +// Abstract traits for connector-injected services (server-only) +#[cfg(feature = "server")] +pub mod traits; + // ACP module - Agent-Client Protocol implementation (server-only) #[cfg(feature = "server")] pub mod acp; diff --git a/crates/dirigent_core/src/traits/inspector.rs b/crates/dirigent_core/src/traits/inspector.rs new file mode 100644 index 0000000..092257d --- /dev/null +++ b/crates/dirigent_core/src/traits/inspector.rs @@ -0,0 +1,38 @@ +use dirigent_protocol::inspector::{NodeId, NodeMetadata, NodeState}; +use std::collections::HashMap; + +/// Abstract interface for registering and updating nodes in the inspector tree. +/// +/// Connectors use this trait to expose their internal process hierarchy +/// (child processes, services, async tasks) without depending on the +/// concrete `dirigent_inspector` crate. +#[async_trait::async_trait] +pub trait ConnectorInspector: Send + Sync { + /// Register a new node under `parent` with the given metadata. + async fn register_node( + &self, + id: NodeId, + parent: &NodeId, + metadata: NodeMetadata, + ) -> Result<(), Box>; + + /// Remove `id` and every node below it from the tree. + async fn deregister_subtree( + &self, + id: &NodeId, + ) -> Result<(), Box>; + + /// Update the runtime state of an existing node. + async fn update_state( + &self, + id: &NodeId, + state: NodeState, + ) -> Result<(), Box>; + + /// Merge additional properties into an existing node's metadata. + async fn update_properties( + &self, + id: &NodeId, + props: HashMap, + ) -> Result<(), Box>; +} diff --git a/crates/dirigent_core/src/traits/mod.rs b/crates/dirigent_core/src/traits/mod.rs new file mode 100644 index 0000000..c5923b9 --- /dev/null +++ b/crates/dirigent_core/src/traits/mod.rs @@ -0,0 +1,5 @@ +mod inspector; +mod process; + +pub use inspector::ConnectorInspector; +pub use process::{graceful_shutdown_async, ProcessGroupManager, ProcessLifecycle}; diff --git a/crates/dirigent_core/src/traits/process.rs b/crates/dirigent_core/src/traits/process.rs new file mode 100644 index 0000000..6138f89 --- /dev/null +++ b/crates/dirigent_core/src/traits/process.rs @@ -0,0 +1,62 @@ +use std::io; + +/// Factory for creating platform-specific process lifecycle handlers. +/// +/// Each connector receives a `ProcessGroupManager` at construction time and +/// calls `create_lifecycle()` to obtain a handler scoped to a single child +/// process (or process group). +pub trait ProcessGroupManager: Send + Sync { + /// Create a fresh lifecycle handler for a new child process. + fn create_lifecycle(&self) -> Box; +} + +/// Platform-specific process lifecycle operations. +/// +/// Implementations handle the OS-level details of job objects (Windows), +/// process groups (Unix), and signal delivery so that connector code +/// remains platform-agnostic. +pub trait ProcessLifecycle: Send + Sync { + /// Configure a `tokio::process::Command` before spawning + /// (e.g., assign to a job object or set `setsid`). + fn configure_async_command(&self, cmd: &mut tokio::process::Command); + + /// Register a spawned child by PID for later signal delivery. + fn register_child(&self, pid: u32) -> Result<(), io::Error>; + + /// Send a graceful shutdown signal (SIGTERM on Unix, CTRL_BREAK on Windows). + fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error>; + + /// Force-kill the process (SIGKILL on Unix, TerminateProcess on Windows). + fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error>; +} + +/// Attempt a graceful shutdown of `child`, falling back to a forced kill +/// after `timeout` elapses. +/// +/// Returns `true` if the process exited within the timeout (or was already +/// gone), `false` if a forced kill was required. +pub async fn graceful_shutdown_async( + lifecycle: &dyn ProcessLifecycle, + child: &mut tokio::process::Child, + timeout: std::time::Duration, +) -> bool { + let pid = match child.id() { + Some(0) | None => return true, + Some(pid) => pid, + }; + + if lifecycle.send_shutdown_signal(pid).is_err() { + return true; + } + + match tokio::time::timeout(timeout, child.wait()).await { + Ok(Ok(_)) => true, + Ok(Err(_)) => true, + Err(_) => { + tracing::debug!(pid, "Graceful shutdown timed out, force killing"); + let _ = lifecycle.send_kill_signal(pid); + let _ = child.wait().await; + false + } + } +} diff --git a/crates/dirigent_inspector/Cargo.toml b/crates/dirigent_inspector/Cargo.toml deleted file mode 100644 index a9fbc13..0000000 --- a/crates/dirigent_inspector/Cargo.toml +++ /dev/null @@ -1,36 +0,0 @@ -[package] -name = "dirigent_inspector" -version = "0.1.0" -edition = "2021" - -[lib] -path = "src/lib.rs" - -[dependencies] -# Async traits -async-trait = "0.1" - -# Date/time handling -chrono = { version = "0.4", features = ["serde"] } - -# Protocol types (canonical node types) -dirigent_protocol = { path = "../dirigent_protocol" } - -# Serialization -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" - -# Cross-platform process/system metrics -sysinfo = "0.33" - -# Error handling -thiserror = "2.0" - -# Async runtime -tokio = { version = "1.42", features = ["sync", "time", "rt", "macros"] } - -# Logging -tracing = "0.1" - -[dev-dependencies] -tokio = { version = "1.42", features = ["full"] } diff --git a/crates/dirigent_inspector/src/channel.rs b/crates/dirigent_inspector/src/channel.rs deleted file mode 100644 index 7673122..0000000 --- a/crates/dirigent_inspector/src/channel.rs +++ /dev/null @@ -1,349 +0,0 @@ -use crate::error::{InspectorError, Result}; -use serde::{Deserialize, Serialize}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; - -/// A command that can be sent to a node. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NodeCommand { - /// Unique command ID for correlation with responses. - pub id: String, - /// What kind of command this is. - pub kind: CommandKind, - /// Arbitrary payload data. - pub payload: serde_json::Value, -} - -/// The type of command being sent to a node. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum CommandKind { - /// Request the node to report its internal state (introspective). - Introspect, - /// Execute a named operation. - Execute(String), - /// Custom extension command. - Custom(String), -} - -/// Response from a node to a command. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CommandResponse { - /// The command ID this is responding to. - pub command_id: String, - /// Whether the command was handled successfully. - pub success: bool, - /// Response data. - pub data: serde_json::Value, -} - -impl CommandResponse { - /// Create a success response. - pub fn ok(command_id: impl Into, data: serde_json::Value) -> Self { - Self { - command_id: command_id.into(), - success: true, - data, - } - } - - /// Create an error response. - pub fn err(command_id: impl Into, message: impl Into) -> Self { - Self { - command_id: command_id.into(), - success: false, - data: serde_json::json!({ "error": message.into() }), - } - } -} - -type CommandPayload = (NodeCommand, oneshot::Sender); - -/// Create a new inspector channel pair. -/// -/// Returns `(sender, receiver)` where: -/// - The **sender** is used by callers (UI, API) to send commands to the node. -/// - The **receiver** is used by the node's loop to receive and respond to commands. -/// -/// `capacity` controls the bounded channel size. -pub fn channel(capacity: usize) -> (InspectorChannelSender, InspectorChannelReceiver) { - let (tx, rx) = mpsc::channel(capacity); - let pending = Arc::new(AtomicUsize::new(0)); - - ( - InspectorChannelSender { - tx, - pending: Arc::clone(&pending), - }, - InspectorChannelReceiver { rx, pending }, - ) -} - -/// Caller-side of the inspector channel: send commands, check queue depth. -pub struct InspectorChannelSender { - tx: mpsc::Sender, - pending: Arc, -} - -impl InspectorChannelSender { - /// Send a command and wait for the response. - pub async fn send(&self, cmd: NodeCommand) -> Result { - let (resp_tx, resp_rx) = oneshot::channel(); - self.pending.fetch_add(1, Ordering::Relaxed); - - self.tx - .send((cmd, resp_tx)) - .await - .map_err(|_| InspectorError::ChannelClosed)?; - - resp_rx.await.map_err(|_| InspectorError::ChannelClosed) - } - - /// Try to send a command without waiting, returning a receiver for the response. - /// - /// This is useful when you want to fire off a command and collect the - /// response later, or in a `select!` branch. - pub fn try_send(&self, cmd: NodeCommand) -> Result> { - let (resp_tx, resp_rx) = oneshot::channel(); - self.pending.fetch_add(1, Ordering::Relaxed); - - self.tx.try_send((cmd, resp_tx)).map_err(|e| match e { - mpsc::error::TrySendError::Full(_) => { - self.pending.fetch_sub(1, Ordering::Relaxed); - InspectorError::ChannelFull - } - mpsc::error::TrySendError::Closed(_) => { - self.pending.fetch_sub(1, Ordering::Relaxed); - InspectorError::ChannelClosed - } - })?; - - Ok(resp_rx) - } - - /// Number of commands currently in the queue (approximate). - pub fn pending_count(&self) -> usize { - self.pending.load(Ordering::Relaxed) - } -} - -impl Clone for InspectorChannelSender { - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - pending: Arc::clone(&self.pending), - } - } -} - -/// Node-side of the inspector channel: receive commands, send responses. -pub struct InspectorChannelReceiver { - rx: mpsc::Receiver, - pending: Arc, -} - -impl InspectorChannelReceiver { - /// Receive the next command. Returns `None` when all senders are dropped. - /// - /// The returned `oneshot::Sender` must be used to send back a response. - pub async fn recv(&mut self) -> Option<(NodeCommand, oneshot::Sender)> { - let result = self.rx.recv().await; - if result.is_some() { - self.pending.fetch_sub(1, Ordering::Relaxed); - } - result - } - - /// Try to receive without blocking. Returns `None` if the channel is empty. - pub fn try_recv(&mut self) -> Option<(NodeCommand, oneshot::Sender)> { - match self.rx.try_recv() { - Ok(payload) => { - self.pending.fetch_sub(1, Ordering::Relaxed); - Some(payload) - } - Err(_) => None, - } - } - - /// Number of commands currently in the queue (approximate). - pub fn pending_count(&self) -> usize { - self.pending.load(Ordering::Relaxed) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn introspect_cmd(id: &str) -> NodeCommand { - NodeCommand { - id: id.to_string(), - kind: CommandKind::Introspect, - payload: serde_json::Value::Null, - } - } - - fn exec_cmd(id: &str, name: &str, payload: serde_json::Value) -> NodeCommand { - NodeCommand { - id: id.to_string(), - kind: CommandKind::Execute(name.to_string()), - payload, - } - } - - #[tokio::test] - async fn test_send_recv_response() { - let (sender, mut receiver) = channel(10); - - // Simulate a node loop in a background task - let node_task = tokio::spawn(async move { - if let Some((cmd, resp_tx)) = receiver.recv().await { - assert_eq!(cmd.id, "cmd-1"); - assert!(matches!(cmd.kind, CommandKind::Introspect)); - let _ = resp_tx.send(CommandResponse::ok( - &cmd.id, - serde_json::json!({ "queue_len": 5 }), - )); - } - }); - - let response = sender.send(introspect_cmd("cmd-1")).await.unwrap(); - assert!(response.success); - assert_eq!(response.command_id, "cmd-1"); - assert_eq!(response.data["queue_len"], 5); - - node_task.await.unwrap(); - } - - #[tokio::test] - async fn test_try_send() { - let (sender, mut receiver) = channel(10); - - let resp_rx = sender - .try_send(exec_cmd("cmd-2", "restart", serde_json::json!({}))) - .unwrap(); - - // Node responds - let (cmd, resp_tx) = receiver.recv().await.unwrap(); - assert_eq!(cmd.id, "cmd-2"); - let _ = resp_tx.send(CommandResponse::ok(&cmd.id, serde_json::json!("restarted"))); - - let response = resp_rx.await.unwrap(); - assert!(response.success); - } - - #[tokio::test] - async fn test_pending_count() { - let (sender, mut receiver) = channel(10); - - assert_eq!(sender.pending_count(), 0); - - // Send 3 commands without receiving - let _r1 = sender.try_send(introspect_cmd("a")).unwrap(); - let _r2 = sender.try_send(introspect_cmd("b")).unwrap(); - let _r3 = sender.try_send(introspect_cmd("c")).unwrap(); - - assert_eq!(sender.pending_count(), 3); - assert_eq!(receiver.pending_count(), 3); - - // Receive one - let (cmd, resp_tx) = receiver.recv().await.unwrap(); - let _ = resp_tx.send(CommandResponse::ok(&cmd.id, serde_json::Value::Null)); - - assert_eq!(receiver.pending_count(), 2); - } - - #[tokio::test] - async fn test_try_send_full() { - let (sender, _receiver) = channel(1); - - // Fill the channel - let _r1 = sender.try_send(introspect_cmd("a")).unwrap(); - - // Should fail with ChannelFull - let result = sender.try_send(introspect_cmd("b")); - assert!(matches!(result, Err(InspectorError::ChannelFull))); - } - - #[tokio::test] - async fn test_send_after_receiver_dropped() { - let (sender, receiver) = channel(10); - drop(receiver); - - let result = sender.send(introspect_cmd("orphan")).await; - assert!(matches!(result, Err(InspectorError::ChannelClosed))); - } - - #[tokio::test] - async fn test_recv_after_sender_dropped() { - let (sender, mut receiver) = channel(10); - drop(sender); - - let result = receiver.recv().await; - assert!(result.is_none()); - } - - #[tokio::test] - async fn test_try_recv_empty() { - let (_sender, mut receiver) = channel(10); - assert!(receiver.try_recv().is_none()); - } - - #[tokio::test] - async fn test_error_response() { - let (sender, mut receiver) = channel(10); - - let node_task = tokio::spawn(async move { - if let Some((cmd, resp_tx)) = receiver.recv().await { - let _ = resp_tx.send(CommandResponse::err(&cmd.id, "not supported")); - } - }); - - let response = sender - .send(exec_cmd("cmd-x", "unknown", serde_json::Value::Null)) - .await - .unwrap(); - assert!(!response.success); - assert_eq!(response.data["error"], "not supported"); - - node_task.await.unwrap(); - } - - #[tokio::test] - async fn test_multiple_senders() { - let (sender, mut receiver) = channel(10); - let sender2 = sender.clone(); - - let _r1 = sender.try_send(introspect_cmd("from-1")).unwrap(); - let _r2 = sender2.try_send(introspect_cmd("from-2")).unwrap(); - - let (cmd1, resp_tx1) = receiver.recv().await.unwrap(); - let _ = resp_tx1.send(CommandResponse::ok(&cmd1.id, serde_json::Value::Null)); - - let (cmd2, resp_tx2) = receiver.recv().await.unwrap(); - let _ = resp_tx2.send(CommandResponse::ok(&cmd2.id, serde_json::Value::Null)); - - // Both commands were received (order may vary since mpsc is FIFO) - let ids = vec![cmd1.id, cmd2.id]; - assert!(ids.contains(&"from-1".to_string())); - assert!(ids.contains(&"from-2".to_string())); - } - - #[test] - fn test_command_serialization() { - let cmd = exec_cmd("cmd-1", "restart", serde_json::json!({"force": true})); - let json = serde_json::to_string(&cmd).unwrap(); - let deserialized: NodeCommand = serde_json::from_str(&json).unwrap(); - assert_eq!(deserialized.id, "cmd-1"); - assert!(matches!(deserialized.kind, CommandKind::Execute(ref name) if name == "restart")); - } - - #[test] - fn test_response_serialization() { - let resp = CommandResponse::ok("cmd-1", serde_json::json!({"status": "done"})); - let json = serde_json::to_string(&resp).unwrap(); - let deserialized: CommandResponse = serde_json::from_str(&json).unwrap(); - assert!(deserialized.success); - assert_eq!(deserialized.data["status"], "done"); - } -} diff --git a/crates/dirigent_inspector/src/error.rs b/crates/dirigent_inspector/src/error.rs deleted file mode 100644 index 9ec227c..0000000 --- a/crates/dirigent_inspector/src/error.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::node::NodeId; - -/// Errors that can occur in the inspector system. -#[derive(Debug, thiserror::Error)] -pub enum InspectorError { - #[error("node not found: {0}")] - NodeNotFound(NodeId), - - #[error("node already exists: {0}")] - NodeAlreadyExists(NodeId), - - #[error("parent node not found: {0}")] - ParentNotFound(NodeId), - - #[error("cannot remove root node")] - CannotRemoveRoot, - - #[error("channel closed")] - ChannelClosed, - - #[error("channel full")] - ChannelFull, - - #[error("command timed out")] - CommandTimeout, - - #[error("process not found: pid {0}")] - ProcessNotFound(u32), - - #[error("{0}")] - Internal(String), -} - -pub type Result = std::result::Result; diff --git a/crates/dirigent_inspector/src/handle.rs b/crates/dirigent_inspector/src/handle.rs deleted file mode 100644 index e00ff76..0000000 --- a/crates/dirigent_inspector/src/handle.rs +++ /dev/null @@ -1,262 +0,0 @@ -use crate::error::Result; -use crate::node::{Inspectable, NodeId, NodeMetadata, NodeState}; -use crate::registry::InspectorRegistry; -use std::collections::HashMap; -use std::sync::Arc; - -/// Handle to a registered node in the inspector tree. -/// -/// Returned when a node is registered. The producer (connector, service, etc.) -/// uses this handle to update their node's state and properties without needing -/// direct access to the registry. -/// -/// When the handle is dropped, the node is automatically deregistered (best-effort). -/// To keep the node alive without the handle, call `detach()` before dropping. -pub struct NodeHandle { - id: NodeId, - registry: Arc, - detached: bool, -} - -impl NodeHandle { - pub(crate) fn new(id: NodeId, registry: Arc) -> Self { - Self { - id, - registry, - detached: false, - } - } - - /// Get this node's ID. - pub fn id(&self) -> &NodeId { - &self.id - } - - /// Update this node's lifecycle state. - pub async fn set_state(&self, state: NodeState) -> Result<()> { - self.registry.update_state(&self.id, state).await - } - - /// Set a single property on this node. - pub async fn set_property(&self, key: &str, value: serde_json::Value) -> Result<()> { - let mut props = HashMap::new(); - props.insert(key.to_string(), value); - self.registry.update_properties(&self.id, props).await - } - - /// Set multiple properties on this node. - pub async fn set_properties(&self, props: HashMap) -> Result<()> { - self.registry.update_properties(&self.id, props).await - } - - /// Register a child node under this node. - /// - /// Returns a new `NodeHandle` for the child. - pub async fn register_child( - &self, - child_id: NodeId, - metadata: NodeMetadata, - inspectable: Option>, - ) -> Result { - self.registry - .register(child_id, &self.id, metadata, inspectable) - .await - } - - /// Explicitly deregister this node and consume the handle. - pub async fn deregister(mut self) -> Result<()> { - self.detached = true; // prevent Drop from double-deregistering - self.registry.deregister(&self.id).await - } - - /// Detach this handle so the node survives when the handle is dropped. - /// - /// After calling this, dropping the handle will NOT deregister the node. - /// The node can still be removed via `InspectorRegistry::deregister()`. - pub fn detach(&mut self) { - self.detached = true; - } - - /// Get a reference to the registry this handle is connected to. - pub fn registry(&self) -> &Arc { - &self.registry - } -} - -impl Drop for NodeHandle { - fn drop(&mut self) { - if !self.detached { - let id = self.id.clone(); - let registry = Arc::clone(&self.registry); - // Best-effort async deregister from a sync Drop context - tokio::spawn(async move { - let _ = registry.deregister(&id).await; - }); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::node::{NodeKind, NodeMetadata, NodeState}; - - #[tokio::test] - async fn test_handle_set_state() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let handle = registry - .register( - NodeId::new("dirigent/test"), - &root, - NodeMetadata::new(NodeKind::Service, "Test"), - None, - ) - .await - .unwrap(); - - handle.set_state(NodeState::Running).await.unwrap(); - - let meta = registry - .get_node(&NodeId::new("dirigent/test")) - .await - .unwrap(); - assert_eq!(meta.state, NodeState::Running); - } - - #[tokio::test] - async fn test_handle_set_property() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let handle = registry - .register( - NodeId::new("dirigent/proc"), - &root, - NodeMetadata::new(NodeKind::Process, "Proc"), - None, - ) - .await - .unwrap(); - - handle - .set_property("pid", serde_json::json!(9999)) - .await - .unwrap(); - - let meta = registry - .get_node(&NodeId::new("dirigent/proc")) - .await - .unwrap(); - assert_eq!(meta.properties["pid"], serde_json::json!(9999)); - } - - #[tokio::test] - async fn test_handle_register_child() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let parent_handle = registry - .register( - NodeId::new("dirigent/parent"), - &root, - NodeMetadata::new(NodeKind::Connector, "Parent"), - None, - ) - .await - .unwrap(); - - let child_handle = parent_handle - .register_child( - NodeId::new("dirigent/parent/child"), - NodeMetadata::new(NodeKind::Process, "Child"), - None, - ) - .await - .unwrap(); - - assert_eq!(child_handle.id().as_str(), "dirigent/parent/child"); - assert!( - registry - .contains(&NodeId::new("dirigent/parent/child")) - .await - ); - } - - #[tokio::test] - async fn test_handle_deregister() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let handle = registry - .register( - NodeId::new("dirigent/temp"), - &root, - NodeMetadata::new(NodeKind::AsyncTask, "Temp"), - None, - ) - .await - .unwrap(); - - assert!(registry.contains(&NodeId::new("dirigent/temp")).await); - - handle.deregister().await.unwrap(); - - assert!(!registry.contains(&NodeId::new("dirigent/temp")).await); - } - - #[tokio::test] - async fn test_handle_drop_deregisters() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - { - let _handle = registry - .register( - NodeId::new("dirigent/ephemeral"), - &root, - NodeMetadata::new(NodeKind::AsyncTask, "Ephemeral"), - None, - ) - .await - .unwrap(); - // handle dropped here - } - - // Give the spawned deregister task a moment to run - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - assert!( - !registry.contains(&NodeId::new("dirigent/ephemeral")).await, - "Node should be deregistered after handle drop" - ); - } - - #[tokio::test] - async fn test_handle_detach_survives_drop() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - { - let mut handle = registry - .register( - NodeId::new("dirigent/persistent"), - &root, - NodeMetadata::new(NodeKind::Service, "Persistent"), - None, - ) - .await - .unwrap(); - handle.detach(); - // handle dropped here, but detached - } - - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - assert!( - registry.contains(&NodeId::new("dirigent/persistent")).await, - "Detached node should survive handle drop" - ); - } -} diff --git a/crates/dirigent_inspector/src/lib.rs b/crates/dirigent_inspector/src/lib.rs deleted file mode 100644 index e3b11b0..0000000 --- a/crates/dirigent_inspector/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -pub mod channel; -pub mod error; -pub mod handle; -pub mod node; -pub mod process; -pub mod registry; -pub mod snapshot; -pub mod system; -pub mod tree; - -// Re-export commonly used types -pub use channel::{ - channel as inspector_channel, CommandKind, CommandResponse, InspectorChannelReceiver, - InspectorChannelSender, NodeCommand, -}; -pub use error::{InspectorError, Result}; -pub use handle::NodeHandle; -pub use node::{Inspectable, NodeId, NodeKind, NodeMetadata, NodeState}; -pub use process::{ProcessInfo, ProcessMonitor, ProcessStatus}; -pub use registry::{InspectorEvent, InspectorRegistry}; -pub use snapshot::{NodeSnapshot, TreeSnapshot}; -pub use system::{SystemInfo, SystemMonitor}; -pub use tree::{NodeTree, TreeNode}; diff --git a/crates/dirigent_inspector/src/node.rs b/crates/dirigent_inspector/src/node.rs deleted file mode 100644 index 519f89d..0000000 --- a/crates/dirigent_inspector/src/node.rs +++ /dev/null @@ -1,109 +0,0 @@ -use async_trait::async_trait; - -// Re-export canonical types from dirigent_protocol -pub use dirigent_protocol::inspector::{NodeId, NodeKind, NodeMetadata, NodeState}; - -/// Trait for components that support rich introspection. -/// -/// Implementing this trait allows a component to provide detailed status reports -/// when queried. The `inspect()` method runs in the component's own async context -/// (introspective), while `current_state()` is synchronous for quick polling. -#[async_trait] -pub trait Inspectable: Send + Sync { - /// Return a detailed status report as JSON. - /// - /// This is an introspective operation: it runs inside the node's context - /// and can access internal state that isn't part of the standard metadata. - async fn inspect(&self) -> serde_json::Value; - - /// Return the current lifecycle state. - /// - /// This should be a cheap, synchronous operation returning the node's - /// current state without blocking. - fn current_state(&self) -> NodeState; -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_node_id_child() { - let root = NodeId::new("dirigent"); - let child = root.child("connectors"); - assert_eq!(child.as_str(), "dirigent/connectors"); - - let grandchild = child.child("acp-claude"); - assert_eq!(grandchild.as_str(), "dirigent/connectors/acp-claude"); - } - - #[test] - fn test_node_id_parent() { - let id = NodeId::new("dirigent/connectors/acp-claude"); - assert_eq!(id.parent().unwrap().as_str(), "dirigent/connectors"); - assert_eq!( - id.parent().unwrap().parent().unwrap().as_str(), - "dirigent" - ); - assert!(NodeId::new("dirigent").parent().is_none()); - } - - #[test] - fn test_node_id_name() { - assert_eq!(NodeId::new("dirigent/connectors/acp").name(), "acp"); - assert_eq!(NodeId::new("dirigent").name(), "dirigent"); - } - - #[test] - fn test_node_id_from_str() { - let id: NodeId = "dirigent/system/host".into(); - assert_eq!(id.as_str(), "dirigent/system/host"); - } - - #[test] - fn test_node_metadata_builder() { - let meta = NodeMetadata::new(NodeKind::Connector, "My Connector") - .with_state(NodeState::Running) - .with_property("base_url", serde_json::json!("http://localhost:3000")); - - assert_eq!(meta.kind, NodeKind::Connector); - assert_eq!(meta.label, "My Connector"); - assert_eq!(meta.state, NodeState::Running); - assert_eq!( - meta.properties.get("base_url").unwrap(), - &serde_json::json!("http://localhost:3000") - ); - } - - #[test] - fn test_node_metadata_serialization() { - let meta = NodeMetadata::new(NodeKind::Process, "stdio-transport") - .with_state(NodeState::Busy("processing message".into())) - .with_property("pid", serde_json::json!(12345)); - - let json = serde_json::to_string(&meta).unwrap(); - let deserialized: NodeMetadata = serde_json::from_str(&json).unwrap(); - - assert_eq!(deserialized.kind, NodeKind::Process); - assert_eq!(deserialized.label, "stdio-transport"); - assert_eq!( - deserialized.state, - NodeState::Busy("processing message".into()) - ); - } - - #[test] - fn test_node_state_display() { - assert_eq!(NodeState::Running.to_string(), "Running"); - assert_eq!( - NodeState::Error("timeout".into()).to_string(), - "Error(timeout)" - ); - } - - #[test] - fn test_node_kind_display() { - assert_eq!(NodeKind::Root.to_string(), "Root"); - assert_eq!(NodeKind::Custom("agent".into()).to_string(), "Custom(agent)"); - } -} diff --git a/crates/dirigent_inspector/src/process.rs b/crates/dirigent_inspector/src/process.rs deleted file mode 100644 index d85f11c..0000000 --- a/crates/dirigent_inspector/src/process.rs +++ /dev/null @@ -1,381 +0,0 @@ -use crate::node::NodeId; -use crate::registry::InspectorRegistry; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; -use sysinfo::{Pid, ProcessesToUpdate, System}; -use tokio::task::JoinHandle; -use tracing::{debug, warn}; - -/// Information about a monitored OS process. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ProcessInfo { - pub pid: u32, - pub name: String, - pub command: Vec, - pub exe: Option, - pub cwd: Option, - pub status: ProcessStatus, - pub cpu_usage_percent: f32, - pub memory_bytes: u64, - pub virtual_memory_bytes: u64, - pub start_time_secs: u64, - pub run_time_secs: u64, - pub disk_read_bytes: u64, - pub disk_written_bytes: u64, -} - -/// Simplified cross-platform process status. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub enum ProcessStatus { - Running, - Sleeping, - Stopped, - Zombie, - Dead, - Unknown, -} - -impl From for ProcessStatus { - fn from(s: sysinfo::ProcessStatus) -> Self { - match s { - sysinfo::ProcessStatus::Run => ProcessStatus::Running, - sysinfo::ProcessStatus::Sleep | sysinfo::ProcessStatus::Idle => ProcessStatus::Sleeping, - sysinfo::ProcessStatus::Stop | sysinfo::ProcessStatus::Tracing => { - ProcessStatus::Stopped - } - sysinfo::ProcessStatus::Zombie => ProcessStatus::Zombie, - sysinfo::ProcessStatus::Dead | sysinfo::ProcessStatus::UninterruptibleDiskSleep => { - ProcessStatus::Dead - } - _ => ProcessStatus::Unknown, - } - } -} - -/// Direction of the last observed I/O activity. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub enum IoDirection { - Read, - Write, -} - -/// Information about the most recent I/O activity. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct IoActivity { - pub direction: IoDirection, - pub timestamp: chrono::DateTime, -} - -/// Monitors a set of OS processes by PID, providing metrics via `sysinfo`. -/// -/// Each tracked PID is associated with a `NodeId` in the inspector tree. -/// The monitor can be polled manually or run as a background task that -/// periodically refreshes and updates the registry. -pub struct ProcessMonitor { - system: System, - tracked: HashMap, -} - -impl ProcessMonitor { - /// Create a new process monitor. - pub fn new() -> Self { - Self { - system: System::new(), - tracked: HashMap::new(), - } - } - - /// Start tracking a process by PID, associated with the given node ID. - pub fn track(&mut self, pid: u32, node_id: NodeId) { - debug!(pid, node_id = %node_id, "Tracking process"); - self.tracked.insert(pid, node_id); - } - - /// Stop tracking a process. - pub fn untrack(&mut self, pid: u32) { - debug!(pid, "Untracking process"); - self.tracked.remove(&pid); - } - - /// Get the set of tracked PIDs and their node IDs. - pub fn tracked_pids(&self) -> &HashMap { - &self.tracked - } - - /// Refresh data for all tracked processes and return their info. - pub fn refresh(&mut self) -> HashMap { - // Build list of PIDs to refresh - let pids: Vec = self.tracked.keys().map(|&p| Pid::from_u32(p)).collect(); - - // Refresh only tracked processes - self.system - .refresh_processes(ProcessesToUpdate::Some(&pids), true); - - let mut results = HashMap::new(); - - for (&pid, _node_id) in &self.tracked { - let sysinfo_pid = Pid::from_u32(pid); - if let Some(process) = self.system.process(sysinfo_pid) { - let disk_usage = process.disk_usage(); - let info = ProcessInfo { - pid, - name: process.name().to_string_lossy().to_string(), - command: process - .cmd() - .iter() - .map(|s| s.to_string_lossy().to_string()) - .collect(), - exe: process.exe().map(|p| p.to_path_buf()), - cwd: process.cwd().map(|p| p.to_path_buf()), - status: ProcessStatus::from(process.status()), - cpu_usage_percent: process.cpu_usage(), - memory_bytes: process.memory(), - virtual_memory_bytes: process.virtual_memory(), - start_time_secs: process.start_time(), - run_time_secs: process.run_time(), - disk_read_bytes: disk_usage.read_bytes, - disk_written_bytes: disk_usage.written_bytes, - }; - results.insert(pid, info); - } - } - - results - } - - /// Get info for a single tracked process. - pub fn get(&mut self, pid: u32) -> Option { - let sysinfo_pid = Pid::from_u32(pid); - self.system - .refresh_processes(ProcessesToUpdate::Some(&[sysinfo_pid]), true); - - self.system.process(sysinfo_pid).map(|process| { - let disk_usage = process.disk_usage(); - ProcessInfo { - pid, - name: process.name().to_string_lossy().to_string(), - command: process - .cmd() - .iter() - .map(|s| s.to_string_lossy().to_string()) - .collect(), - exe: process.exe().map(|p| p.to_path_buf()), - cwd: process.cwd().map(|p| p.to_path_buf()), - status: ProcessStatus::from(process.status()), - cpu_usage_percent: process.cpu_usage(), - memory_bytes: process.memory(), - virtual_memory_bytes: process.virtual_memory(), - start_time_secs: process.start_time(), - run_time_secs: process.run_time(), - disk_read_bytes: disk_usage.read_bytes, - disk_written_bytes: disk_usage.written_bytes, - } - }) - } - - /// Check if a process is alive (outrospective: queries the OS directly). - pub fn is_alive(&mut self, pid: u32) -> bool { - let sysinfo_pid = Pid::from_u32(pid); - self.system - .refresh_processes(ProcessesToUpdate::Some(&[sysinfo_pid]), true); - self.system.process(sysinfo_pid).is_some() - } - - /// Spawn a background task that periodically refreshes tracked processes - /// and updates their nodes in the registry. - /// - /// The task runs until the returned `JoinHandle` is aborted or the - /// monitor is dropped. - pub fn start_polling( - mut self, - registry: Arc, - interval: Duration, - ) -> JoinHandle<()> { - tokio::spawn(async move { - let mut ticker = tokio::time::interval(interval); - loop { - ticker.tick().await; - - let infos = self.refresh(); - - for (pid, info) in &infos { - if let Some(node_id) = self.tracked.get(pid) { - let mut props = HashMap::new(); - props.insert("pid".to_string(), serde_json::json!(info.pid)); - props.insert("name".to_string(), serde_json::json!(info.name)); - props.insert("command".to_string(), serde_json::json!(info.command)); - props.insert( - "status".to_string(), - serde_json::to_value(&info.status).unwrap_or_default(), - ); - props.insert( - "cpu_usage_percent".to_string(), - serde_json::json!(info.cpu_usage_percent), - ); - props.insert( - "memory_bytes".to_string(), - serde_json::json!(info.memory_bytes), - ); - props.insert( - "virtual_memory_bytes".to_string(), - serde_json::json!(info.virtual_memory_bytes), - ); - props.insert( - "run_time_secs".to_string(), - serde_json::json!(info.run_time_secs), - ); - props.insert( - "disk_read_bytes".to_string(), - serde_json::json!(info.disk_read_bytes), - ); - props.insert( - "disk_written_bytes".to_string(), - serde_json::json!(info.disk_written_bytes), - ); - - if let Err(e) = registry.update_properties(node_id, props).await { - warn!( - pid, - node_id = %node_id, - error = %e, - "Failed to update process node properties" - ); - } - } - } - - // Check for dead processes - let dead_pids: Vec = self - .tracked - .keys() - .filter(|&&pid| !infos.contains_key(&pid)) - .copied() - .collect(); - - for pid in dead_pids { - if let Some(node_id) = self.tracked.get(&pid) { - debug!(pid, node_id = %node_id, "Process no longer alive"); - let _ = registry - .update_state(node_id, crate::node::NodeState::Stopped) - .await; - } - } - } - }) - } -} - -impl Default for ProcessMonitor { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_process_status_from_sysinfo() { - assert_eq!( - ProcessStatus::from(sysinfo::ProcessStatus::Run), - ProcessStatus::Running - ); - assert_eq!( - ProcessStatus::from(sysinfo::ProcessStatus::Sleep), - ProcessStatus::Sleeping - ); - assert_eq!( - ProcessStatus::from(sysinfo::ProcessStatus::Zombie), - ProcessStatus::Zombie - ); - assert_eq!( - ProcessStatus::from(sysinfo::ProcessStatus::Stop), - ProcessStatus::Stopped - ); - } - - #[test] - fn test_process_monitor_track_untrack() { - let mut monitor = ProcessMonitor::new(); - let node_id = NodeId::new("dirigent/test/proc"); - - monitor.track(1234, node_id.clone()); - assert!(monitor.tracked_pids().contains_key(&1234)); - - monitor.untrack(1234); - assert!(!monitor.tracked_pids().contains_key(&1234)); - } - - #[test] - fn test_process_monitor_refresh_current_process() { - let mut monitor = ProcessMonitor::new(); - let current_pid = std::process::id(); - let node_id = NodeId::new("dirigent/test/self"); - - monitor.track(current_pid, node_id); - - let infos = monitor.refresh(); - assert!( - infos.contains_key(¤t_pid), - "Current process should be found" - ); - - let info = &infos[¤t_pid]; - assert_eq!(info.pid, current_pid); - assert!(!info.name.is_empty()); - assert!(info.memory_bytes > 0); - } - - #[test] - fn test_process_monitor_get_current_process() { - let mut monitor = ProcessMonitor::new(); - let current_pid = std::process::id(); - - let info = monitor.get(current_pid); - assert!(info.is_some(), "Should find current process"); - - let info = info.unwrap(); - assert_eq!(info.pid, current_pid); - } - - #[test] - fn test_process_monitor_is_alive() { - let mut monitor = ProcessMonitor::new(); - - // Current process should be alive - assert!(monitor.is_alive(std::process::id())); - - // A very high PID should not exist - assert!(!monitor.is_alive(u32::MAX)); - } - - #[test] - fn test_process_info_serialization() { - let info = ProcessInfo { - pid: 1234, - name: "test".to_string(), - command: vec!["test".to_string(), "--flag".to_string()], - exe: Some(PathBuf::from("/usr/bin/test")), - cwd: Some(PathBuf::from("/home/user")), - status: ProcessStatus::Running, - cpu_usage_percent: 12.5, - memory_bytes: 1024 * 1024, - virtual_memory_bytes: 2048 * 1024, - start_time_secs: 1700000000, - run_time_secs: 3600, - disk_read_bytes: 1024, - disk_written_bytes: 2048, - }; - - let json = serde_json::to_string(&info).unwrap(); - let deserialized: ProcessInfo = serde_json::from_str(&json).unwrap(); - - assert_eq!(deserialized.pid, 1234); - assert_eq!(deserialized.status, ProcessStatus::Running); - assert_eq!(deserialized.memory_bytes, 1024 * 1024); - } -} diff --git a/crates/dirigent_inspector/src/registry.rs b/crates/dirigent_inspector/src/registry.rs deleted file mode 100644 index bc96f73..0000000 --- a/crates/dirigent_inspector/src/registry.rs +++ /dev/null @@ -1,459 +0,0 @@ -use crate::error::Result; -use crate::node::{Inspectable, NodeId, NodeKind, NodeMetadata, NodeState}; -use crate::tree::NodeTree; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::{broadcast, RwLock}; - -/// Events emitted by the registry when the tree changes. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum InspectorEvent { - NodeRegistered { - id: NodeId, - parent: NodeId, - kind: NodeKind, - }, - NodeRemoved { - id: NodeId, - }, - StateChanged { - id: NodeId, - old: NodeState, - new: NodeState, - }, - PropertiesUpdated { - id: NodeId, - keys: Vec, - }, -} - -/// Central registry for the inspector tree. -/// -/// Thread-safe: all operations acquire the internal `RwLock` as needed. -/// Emits `InspectorEvent`s on a broadcast channel for reactive consumers. -pub struct InspectorRegistry { - tree: Arc>, - event_tx: broadcast::Sender, -} - -impl InspectorRegistry { - /// Create a new registry with a root node. - /// - /// The root node ID is `"dirigent"` by default, with `NodeKind::Root`. - pub fn new() -> Self { - let root_id = NodeId::new("dirigent"); - let root_meta = - NodeMetadata::new(NodeKind::Root, "Dirigent").with_state(NodeState::Running); - let tree = NodeTree::new(root_id, root_meta); - let (event_tx, _) = broadcast::channel(500); - - Self { - tree: Arc::new(RwLock::new(tree)), - event_tx, - } - } - - /// Create a registry with a custom root node. - pub fn with_root(root_id: NodeId, root_metadata: NodeMetadata) -> Self { - let tree = NodeTree::new(root_id, root_metadata); - let (event_tx, _) = broadcast::channel(500); - - Self { - tree: Arc::new(RwLock::new(tree)), - event_tx, - } - } - - /// Get the root node ID. - pub async fn root_id(&self) -> NodeId { - let tree = self.tree.read().await; - tree.root_id().clone() - } - - /// Register a new node under the given parent. - /// - /// Returns a `NodeHandle` that the producer can use to update this node. - pub async fn register( - self: &Arc, - id: NodeId, - parent: &NodeId, - metadata: NodeMetadata, - inspectable: Option>, - ) -> Result { - let kind = metadata.kind.clone(); - let parent_clone = parent.clone(); - - { - let mut tree = self.tree.write().await; - tree.insert(id.clone(), parent, metadata, inspectable)?; - } - - let _ = self.event_tx.send(InspectorEvent::NodeRegistered { - id: id.clone(), - parent: parent_clone, - kind, - }); - - Ok(crate::handle::NodeHandle::new(id, Arc::clone(self))) - } - - /// Deregister a node (and reparent its children to its parent). - pub async fn deregister(&self, id: &NodeId) -> Result<()> { - { - let mut tree = self.tree.write().await; - tree.remove(id)?; - } - - let _ = self - .event_tx - .send(InspectorEvent::NodeRemoved { id: id.clone() }); - Ok(()) - } - - /// Deregister a node and all its descendants. - pub async fn deregister_subtree(&self, id: &NodeId) -> Result<()> { - { - let mut tree = self.tree.write().await; - tree.remove_subtree(id)?; - } - - let _ = self - .event_tx - .send(InspectorEvent::NodeRemoved { id: id.clone() }); - Ok(()) - } - - /// Update a node's lifecycle state. - pub async fn update_state(&self, id: &NodeId, state: NodeState) -> Result<()> { - let old = { - let mut tree = self.tree.write().await; - tree.update_state(id, state.clone())? - }; - - if old != state { - let _ = self.event_tx.send(InspectorEvent::StateChanged { - id: id.clone(), - old, - new: state, - }); - } - - Ok(()) - } - - /// Update or insert properties on a node. - pub async fn update_properties( - &self, - id: &NodeId, - props: HashMap, - ) -> Result<()> { - let keys = { - let mut tree = self.tree.write().await; - tree.update_properties(id, props)? - }; - - if !keys.is_empty() { - let _ = self.event_tx.send(InspectorEvent::PropertiesUpdated { - id: id.clone(), - keys, - }); - } - - Ok(()) - } - - /// Get a clone of a node's metadata. - pub async fn get_node(&self, id: &NodeId) -> Option { - let tree = self.tree.read().await; - tree.get(id).map(|n| n.metadata.clone()) - } - - /// Get metadata for all direct children of a node. - pub async fn get_children(&self, id: &NodeId) -> Vec<(NodeId, NodeMetadata)> { - let tree = self.tree.read().await; - tree.children(id) - .into_iter() - .map(|n| (n.id.clone(), n.metadata.clone())) - .collect() - } - - /// Check if a node exists. - pub async fn contains(&self, id: &NodeId) -> bool { - let tree = self.tree.read().await; - tree.contains(id) - } - - /// Get the total number of nodes. - pub async fn node_count(&self) -> usize { - let tree = self.tree.read().await; - tree.len() - } - - /// Call `Inspectable::inspect()` on a node, if it implements the trait. - /// - /// Returns `None` if the node doesn't exist or doesn't have an `Inspectable` impl. - pub async fn inspect_node(&self, id: &NodeId) -> Option { - let inspectable = { - let tree = self.tree.read().await; - tree.get(id) - .and_then(|n| n.inspectable.as_ref().map(Arc::clone)) - }; - - match inspectable { - Some(i) => Some(i.inspect().await), - None => None, - } - } - - /// Subscribe to tree change events. - pub fn subscribe(&self) -> broadcast::Receiver { - self.event_tx.subscribe() - } - - /// Get a snapshot of the entire tree (see snapshot module). - pub async fn snapshot(&self) -> crate::snapshot::TreeSnapshot { - let tree = self.tree.read().await; - crate::snapshot::TreeSnapshot::from_tree(&tree) - } -} - -impl Default for InspectorRegistry { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_registry_new_has_root() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - assert_eq!(root.as_str(), "dirigent"); - assert!(registry.contains(&root).await); - assert_eq!(registry.node_count().await, 1); - } - - #[tokio::test] - async fn test_register_and_get() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let id = NodeId::new("dirigent/connectors"); - let meta = NodeMetadata::new(NodeKind::Connector, "Connectors"); - - let handle = registry - .register(id.clone(), &root, meta, None) - .await - .unwrap(); - assert_eq!(handle.id().as_str(), "dirigent/connectors"); - - let retrieved = registry.get_node(&id).await.unwrap(); - assert_eq!(retrieved.label, "Connectors"); - assert_eq!(registry.node_count().await, 2); - } - - #[tokio::test] - async fn test_register_emits_event() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - let mut rx = registry.subscribe(); - - let id = NodeId::new("dirigent/test"); - let meta = NodeMetadata::new(NodeKind::Service, "Test"); - - let _handle = registry - .register(id.clone(), &root, meta, None) - .await - .unwrap(); - - let event = rx.recv().await.unwrap(); - match event { - InspectorEvent::NodeRegistered { - id: event_id, - parent, - kind, - } => { - assert_eq!(event_id.as_str(), "dirigent/test"); - assert_eq!(parent.as_str(), "dirigent"); - assert_eq!(kind, NodeKind::Service); - } - _ => panic!("Expected NodeRegistered event"), - } - } - - #[tokio::test] - async fn test_deregister() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let id = NodeId::new("dirigent/temp"); - let meta = NodeMetadata::new(NodeKind::AsyncTask, "Temp"); - let _handle = registry - .register(id.clone(), &root, meta, None) - .await - .unwrap(); - - assert!(registry.contains(&id).await); - registry.deregister(&id).await.unwrap(); - assert!(!registry.contains(&id).await); - } - - #[tokio::test] - async fn test_update_state_emits_event() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let id = NodeId::new("dirigent/svc"); - let meta = NodeMetadata::new(NodeKind::Service, "Service"); - let _handle = registry - .register(id.clone(), &root, meta, None) - .await - .unwrap(); - - let mut rx = registry.subscribe(); - - registry - .update_state(&id, NodeState::Error("crash".into())) - .await - .unwrap(); - - let event = rx.recv().await.unwrap(); - match event { - InspectorEvent::StateChanged { id: eid, old, new } => { - assert_eq!(eid.as_str(), "dirigent/svc"); - assert_eq!(old, NodeState::Initializing); - assert_eq!(new, NodeState::Error("crash".into())); - } - _ => panic!("Expected StateChanged event"), - } - } - - #[tokio::test] - async fn test_no_event_on_same_state() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let id = NodeId::new("dirigent/svc"); - let meta = NodeMetadata::new(NodeKind::Service, "Service").with_state(NodeState::Running); - let _handle = registry - .register(id.clone(), &root, meta, None) - .await - .unwrap(); - - let mut rx = registry.subscribe(); - - // Update to the same state - registry - .update_state(&id, NodeState::Running) - .await - .unwrap(); - - // Should not receive a StateChanged event - let result = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await; - assert!(result.is_err(), "Should not emit event for same state"); - } - - #[tokio::test] - async fn test_update_properties() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let id = NodeId::new("dirigent/proc"); - let meta = NodeMetadata::new(NodeKind::Process, "Process"); - let _handle = registry - .register(id.clone(), &root, meta, None) - .await - .unwrap(); - - let mut props = HashMap::new(); - props.insert("pid".to_string(), serde_json::json!(1234)); - props.insert("cpu".to_string(), serde_json::json!(50.0)); - - registry.update_properties(&id, props).await.unwrap(); - - let node = registry.get_node(&id).await.unwrap(); - assert_eq!(node.properties["pid"], serde_json::json!(1234)); - assert_eq!(node.properties["cpu"], serde_json::json!(50.0)); - } - - #[tokio::test] - async fn test_get_children() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let _h1 = registry - .register( - NodeId::new("dirigent/a"), - &root, - NodeMetadata::new(NodeKind::Service, "A"), - None, - ) - .await - .unwrap(); - let _h2 = registry - .register( - NodeId::new("dirigent/b"), - &root, - NodeMetadata::new(NodeKind::Service, "B"), - None, - ) - .await - .unwrap(); - - let children = registry.get_children(&root).await; - assert_eq!(children.len(), 2); - let labels: Vec<&str> = children.iter().map(|(_, m)| m.label.as_str()).collect(); - assert!(labels.contains(&"A")); - assert!(labels.contains(&"B")); - } - - #[tokio::test] - async fn test_inspect_node() { - use async_trait::async_trait; - - struct MockInspectable; - - #[async_trait] - impl Inspectable for MockInspectable { - async fn inspect(&self) -> serde_json::Value { - serde_json::json!({ "internal_queue_len": 42 }) - } - fn current_state(&self) -> NodeState { - NodeState::Running - } - } - - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let id = NodeId::new("dirigent/inspectable"); - let meta = NodeMetadata::new(NodeKind::Service, "Inspectable Service"); - let _handle = registry - .register(id.clone(), &root, meta, Some(Arc::new(MockInspectable))) - .await - .unwrap(); - - let result = registry.inspect_node(&id).await; - assert_eq!( - result.unwrap(), - serde_json::json!({ "internal_queue_len": 42 }) - ); - - // Non-inspectable node returns None - let id2 = NodeId::new("dirigent/plain"); - let _h2 = registry - .register( - id2.clone(), - &root, - NodeMetadata::new(NodeKind::Service, "Plain"), - None, - ) - .await - .unwrap(); - assert!(registry.inspect_node(&id2).await.is_none()); - } -} diff --git a/crates/dirigent_inspector/src/snapshot.rs b/crates/dirigent_inspector/src/snapshot.rs deleted file mode 100644 index a2599b7..0000000 --- a/crates/dirigent_inspector/src/snapshot.rs +++ /dev/null @@ -1,154 +0,0 @@ -use crate::node::{NodeId, NodeMetadata}; -use crate::tree::NodeTree; -use serde::{Deserialize, Serialize}; - -/// A serializable point-in-time capture of the entire inspector tree. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TreeSnapshot { - pub timestamp: chrono::DateTime, - pub nodes: Vec, -} - -/// Snapshot of a single node. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NodeSnapshot { - pub id: NodeId, - pub parent: Option, - pub children: Vec, - pub metadata: NodeMetadata, -} - -impl TreeSnapshot { - /// Create a snapshot from a `NodeTree`. - pub(crate) fn from_tree(tree: &NodeTree) -> Self { - let nodes = tree - .all_nodes() - .into_iter() - .map(|n| NodeSnapshot { - id: n.id.clone(), - parent: n.parent.clone(), - children: n.children.clone(), - metadata: n.metadata.clone(), - }) - .collect(); - - Self { - timestamp: chrono::Utc::now(), - nodes, - } - } - - /// Serialize the snapshot to a JSON `Value`. - pub fn to_json(&self) -> serde_json::Value { - serde_json::to_value(self).unwrap_or_default() - } - - /// Serialize the snapshot to pretty-printed JSON string. - pub fn to_json_pretty(&self) -> String { - serde_json::to_string_pretty(self).unwrap_or_default() - } - - /// Number of nodes in the snapshot. - pub fn node_count(&self) -> usize { - self.nodes.len() - } - - /// Find a node by ID. - pub fn find(&self, id: &NodeId) -> Option<&NodeSnapshot> { - self.nodes.iter().find(|n| &n.id == id) - } - - /// Get the root node (the one with no parent). - pub fn root(&self) -> Option<&NodeSnapshot> { - self.nodes.iter().find(|n| n.parent.is_none()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::node::{NodeKind, NodeMetadata, NodeState}; - use crate::tree::NodeTree; - - fn make_tree() -> NodeTree { - let root = NodeId::new("dirigent"); - let mut tree = NodeTree::new( - root.clone(), - NodeMetadata::new(NodeKind::Root, "Dirigent").with_state(NodeState::Running), - ); - tree.insert( - NodeId::new("dirigent/svc"), - &root, - NodeMetadata::new(NodeKind::Service, "Service A").with_state(NodeState::Running), - None, - ) - .unwrap(); - tree.insert( - NodeId::new("dirigent/svc/task"), - &NodeId::new("dirigent/svc"), - NodeMetadata::new(NodeKind::AsyncTask, "Task 1"), - None, - ) - .unwrap(); - tree - } - - #[test] - fn test_snapshot_from_tree() { - let tree = make_tree(); - let snap = TreeSnapshot::from_tree(&tree); - - assert_eq!(snap.node_count(), 3); - assert!(snap.find(&NodeId::new("dirigent")).is_some()); - assert!(snap.find(&NodeId::new("dirigent/svc")).is_some()); - assert!(snap.find(&NodeId::new("dirigent/svc/task")).is_some()); - } - - #[test] - fn test_snapshot_root() { - let tree = make_tree(); - let snap = TreeSnapshot::from_tree(&tree); - let root = snap.root().unwrap(); - assert_eq!(root.id.as_str(), "dirigent"); - assert!(root.parent.is_none()); - } - - #[test] - fn test_snapshot_serialization_roundtrip() { - let tree = make_tree(); - let snap = TreeSnapshot::from_tree(&tree); - - let json = serde_json::to_string(&snap).unwrap(); - let deserialized: TreeSnapshot = serde_json::from_str(&json).unwrap(); - - assert_eq!(deserialized.node_count(), snap.node_count()); - assert_eq!( - deserialized - .find(&NodeId::new("dirigent/svc")) - .unwrap() - .metadata - .label, - "Service A" - ); - } - - #[test] - fn test_snapshot_to_json_pretty() { - let tree = make_tree(); - let snap = TreeSnapshot::from_tree(&tree); - let pretty = snap.to_json_pretty(); - - assert!(pretty.contains("dirigent")); - assert!(pretty.contains("Service A")); - } - - #[test] - fn test_snapshot_to_json_value() { - let tree = make_tree(); - let snap = TreeSnapshot::from_tree(&tree); - let val = snap.to_json(); - - assert!(val.get("timestamp").is_some()); - assert!(val.get("nodes").unwrap().is_array()); - } -} diff --git a/crates/dirigent_inspector/src/system.rs b/crates/dirigent_inspector/src/system.rs deleted file mode 100644 index 4f71777..0000000 --- a/crates/dirigent_inspector/src/system.rs +++ /dev/null @@ -1,231 +0,0 @@ -use crate::node::NodeId; -use crate::registry::InspectorRegistry; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use sysinfo::System; -use tokio::task::JoinHandle; -use tracing::warn; - -/// Host system information. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SystemInfo { - pub hostname: Option, - pub os_name: Option, - pub os_version: Option, - pub kernel_version: Option, - pub arch: String, - pub total_memory_bytes: u64, - pub used_memory_bytes: u64, - pub available_memory_bytes: u64, - pub total_swap_bytes: u64, - pub used_swap_bytes: u64, - pub cpu_count: usize, - pub physical_core_count: Option, - pub global_cpu_usage_percent: f32, - pub uptime_secs: u64, -} - -/// Monitors host system metrics (memory, CPU, etc.). -pub struct SystemMonitor { - system: System, -} - -impl SystemMonitor { - /// Create a new system monitor. - pub fn new() -> Self { - let mut system = System::new(); - // Initial refresh to populate baseline data - system.refresh_memory(); - system.refresh_cpu_usage(); - Self { system } - } - - /// Refresh and return current system information. - pub fn refresh(&mut self) -> SystemInfo { - self.system.refresh_memory(); - self.system.refresh_cpu_usage(); - - SystemInfo { - hostname: System::host_name(), - os_name: System::name(), - os_version: System::os_version(), - kernel_version: System::kernel_version(), - arch: System::cpu_arch(), - total_memory_bytes: self.system.total_memory(), - used_memory_bytes: self.system.used_memory(), - available_memory_bytes: self.system.available_memory(), - total_swap_bytes: self.system.total_swap(), - used_swap_bytes: self.system.used_swap(), - cpu_count: self.system.cpus().len(), - physical_core_count: self.system.physical_core_count(), - global_cpu_usage_percent: self.system.global_cpu_usage(), - uptime_secs: System::uptime(), - } - } - - /// Spawn a background task that periodically updates a system node in the registry. - /// - /// The `node_id` should already be registered in the tree (e.g., "dirigent/system/host"). - pub fn start_polling( - mut self, - registry: Arc, - node_id: NodeId, - interval: Duration, - ) -> JoinHandle<()> { - tokio::spawn(async move { - let mut ticker = tokio::time::interval(interval); - loop { - ticker.tick().await; - - let info = self.refresh(); - - let mut props = HashMap::new(); - props.insert("hostname".to_string(), serde_json::json!(info.hostname)); - props.insert("os_name".to_string(), serde_json::json!(info.os_name)); - props.insert("os_version".to_string(), serde_json::json!(info.os_version)); - props.insert("arch".to_string(), serde_json::json!(info.arch)); - props.insert( - "total_memory_bytes".to_string(), - serde_json::json!(info.total_memory_bytes), - ); - props.insert( - "used_memory_bytes".to_string(), - serde_json::json!(info.used_memory_bytes), - ); - props.insert( - "available_memory_bytes".to_string(), - serde_json::json!(info.available_memory_bytes), - ); - props.insert( - "total_swap_bytes".to_string(), - serde_json::json!(info.total_swap_bytes), - ); - props.insert( - "used_swap_bytes".to_string(), - serde_json::json!(info.used_swap_bytes), - ); - props.insert("cpu_count".to_string(), serde_json::json!(info.cpu_count)); - props.insert( - "physical_core_count".to_string(), - serde_json::json!(info.physical_core_count), - ); - props.insert( - "global_cpu_usage_percent".to_string(), - serde_json::json!(info.global_cpu_usage_percent), - ); - props.insert( - "uptime_secs".to_string(), - serde_json::json!(info.uptime_secs), - ); - - if let Err(e) = registry.update_properties(&node_id, props).await { - warn!( - node_id = %node_id, - error = %e, - "Failed to update system node properties" - ); - } - } - }) - } -} - -impl Default for SystemMonitor { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_system_monitor_refresh() { - let mut monitor = SystemMonitor::new(); - - // Need a brief sleep for CPU usage sampling - std::thread::sleep(std::time::Duration::from_millis(200)); - - let info = monitor.refresh(); - - assert!(info.total_memory_bytes > 0, "Should have total memory"); - assert!(info.cpu_count > 0, "Should have at least 1 CPU"); - assert!(info.uptime_secs > 0, "Should have uptime"); - assert!(info.arch.len() > 0, "Should have arch info"); - } - - #[test] - fn test_system_info_serialization() { - let info = SystemInfo { - hostname: Some("testhost".to_string()), - os_name: Some("macOS".to_string()), - os_version: Some("14.0".to_string()), - kernel_version: Some("23.0.0".to_string()), - arch: "arm64".to_string(), - total_memory_bytes: 16 * 1024 * 1024 * 1024, - used_memory_bytes: 8 * 1024 * 1024 * 1024, - available_memory_bytes: 8 * 1024 * 1024 * 1024, - total_swap_bytes: 2 * 1024 * 1024 * 1024, - used_swap_bytes: 512 * 1024 * 1024, - cpu_count: 10, - physical_core_count: Some(10), - global_cpu_usage_percent: 25.5, - uptime_secs: 86400, - }; - - let json = serde_json::to_string(&info).unwrap(); - let deserialized: SystemInfo = serde_json::from_str(&json).unwrap(); - - assert_eq!(deserialized.hostname, Some("testhost".to_string())); - assert_eq!(deserialized.total_memory_bytes, 16 * 1024 * 1024 * 1024); - assert_eq!(deserialized.cpu_count, 10); - } - - #[tokio::test] - async fn test_system_monitor_polling() { - use crate::node::{NodeKind, NodeMetadata, NodeState}; - - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - // Register system node - let node_id = NodeId::new("dirigent/system/host"); - let mut handle = registry - .register( - node_id.clone(), - &root, - NodeMetadata::new(NodeKind::System, "Host System").with_state(NodeState::Running), - None, - ) - .await - .unwrap(); - // Detach so polling task can update it - // (handle would deregister on drop otherwise) - handle.detach(); - - let monitor = SystemMonitor::new(); - let task = monitor.start_polling( - Arc::clone(®istry), - node_id.clone(), - Duration::from_millis(100), - ); - - // Wait for at least one poll cycle - tokio::time::sleep(Duration::from_millis(250)).await; - - let meta = registry.get_node(&node_id).await.unwrap(); - assert!( - meta.properties.contains_key("total_memory_bytes"), - "Should have system metrics after polling" - ); - assert!( - meta.properties.contains_key("cpu_count"), - "Should have CPU count" - ); - - task.abort(); - } -} diff --git a/crates/dirigent_inspector/src/tree.rs b/crates/dirigent_inspector/src/tree.rs deleted file mode 100644 index 1ac6c80..0000000 --- a/crates/dirigent_inspector/src/tree.rs +++ /dev/null @@ -1,544 +0,0 @@ -use crate::error::{InspectorError, Result}; -use crate::node::{Inspectable, NodeId, NodeMetadata, NodeState}; -use std::collections::HashMap; -use std::sync::Arc; - -/// A node within the inspector tree, holding metadata and relationships. -pub struct TreeNode { - pub id: NodeId, - pub metadata: NodeMetadata, - pub parent: Option, - pub children: Vec, - pub inspectable: Option>, -} - -impl TreeNode { - fn new( - id: NodeId, - metadata: NodeMetadata, - parent: Option, - inspectable: Option>, - ) -> Self { - Self { - id, - metadata, - parent, - children: Vec::new(), - inspectable, - } - } -} - -/// The inspector tree: a rooted tree of `TreeNode`s with parent-child relationships. -/// -/// All mutations go through this struct. Thread safety is provided by wrapping -/// `NodeTree` in `Arc>` at the registry level. -pub struct NodeTree { - nodes: HashMap, - root: NodeId, -} - -impl NodeTree { - /// Create a new tree with a root node. - /// - /// The root node is always present and cannot be removed. - pub fn new(root_id: NodeId, root_metadata: NodeMetadata) -> Self { - let mut nodes = HashMap::new(); - nodes.insert( - root_id.clone(), - TreeNode::new(root_id.clone(), root_metadata, None, None), - ); - Self { - nodes, - root: root_id, - } - } - - /// Get the root node ID. - pub fn root_id(&self) -> &NodeId { - &self.root - } - - /// Insert a new node as a child of the given parent. - pub fn insert( - &mut self, - id: NodeId, - parent: &NodeId, - metadata: NodeMetadata, - inspectable: Option>, - ) -> Result<()> { - if self.nodes.contains_key(&id) { - return Err(InspectorError::NodeAlreadyExists(id)); - } - if !self.nodes.contains_key(parent) { - return Err(InspectorError::ParentNotFound(parent.clone())); - } - - let node = TreeNode::new(id.clone(), metadata, Some(parent.clone()), inspectable); - self.nodes.insert(id.clone(), node); - - // Add as child of parent - if let Some(parent_node) = self.nodes.get_mut(parent) { - parent_node.children.push(id); - } - - Ok(()) - } - - /// Remove a node and reparent its children to the node's parent. - /// - /// The root node cannot be removed. - pub fn remove(&mut self, id: &NodeId) -> Result<()> { - if id == &self.root { - return Err(InspectorError::CannotRemoveRoot); - } - - let node = self - .nodes - .get(id) - .ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?; - - let parent_id = node.parent.clone(); - let children: Vec = node.children.clone(); - - // Reparent children to the removed node's parent - if let Some(ref parent_id) = parent_id { - for child_id in &children { - if let Some(child) = self.nodes.get_mut(child_id) { - child.parent = Some(parent_id.clone()); - } - } - // Update parent's children: remove this node, add reparented children - if let Some(parent) = self.nodes.get_mut(parent_id) { - parent.children.retain(|c| c != id); - parent.children.extend(children); - } - } - - self.nodes.remove(id); - Ok(()) - } - - /// Remove a node and all its descendants. - /// - /// The root node cannot be removed. - pub fn remove_subtree(&mut self, id: &NodeId) -> Result<()> { - if id == &self.root { - return Err(InspectorError::CannotRemoveRoot); - } - - if !self.nodes.contains_key(id) { - return Err(InspectorError::NodeNotFound(id.clone())); - } - - // Collect all descendant IDs via BFS - let mut to_remove = Vec::new(); - let mut queue = vec![id.clone()]; - while let Some(current) = queue.pop() { - if let Some(node) = self.nodes.get(¤t) { - queue.extend(node.children.clone()); - } - to_remove.push(current); - } - - // Remove this node from parent's children list - let parent_id = self.nodes.get(id).and_then(|n| n.parent.clone()); - if let Some(parent_id) = parent_id { - if let Some(parent) = self.nodes.get_mut(&parent_id) { - parent.children.retain(|c| c != id); - } - } - - // Remove all collected nodes - for node_id in &to_remove { - self.nodes.remove(node_id); - } - - Ok(()) - } - - /// Get a reference to a node by ID. - pub fn get(&self, id: &NodeId) -> Option<&TreeNode> { - self.nodes.get(id) - } - - /// Get a mutable reference to a node by ID. - pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut TreeNode> { - self.nodes.get_mut(id) - } - - /// Get the direct children of a node. - pub fn children(&self, id: &NodeId) -> Vec<&TreeNode> { - self.nodes - .get(id) - .map(|node| { - node.children - .iter() - .filter_map(|child_id| self.nodes.get(child_id)) - .collect() - }) - .unwrap_or_default() - } - - /// Get all ancestors of a node, from immediate parent to root. - pub fn ancestors(&self, id: &NodeId) -> Vec<&TreeNode> { - let mut result = Vec::new(); - let mut current = self.nodes.get(id).and_then(|n| n.parent.as_ref()); - while let Some(parent_id) = current { - if let Some(parent) = self.nodes.get(parent_id) { - result.push(parent); - current = parent.parent.as_ref(); - } else { - break; - } - } - result - } - - /// Get all nodes in the tree. - pub fn all_nodes(&self) -> Vec<&TreeNode> { - self.nodes.values().collect() - } - - /// Total number of nodes in the tree. - pub fn len(&self) -> usize { - self.nodes.len() - } - - /// Whether the tree is empty (only root). - pub fn is_empty(&self) -> bool { - self.nodes.len() <= 1 - } - - /// Update a node's state, returning the old state. - pub fn update_state(&mut self, id: &NodeId, state: NodeState) -> Result { - let node = self - .nodes - .get_mut(id) - .ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?; - let old = node.metadata.state.clone(); - node.metadata.state = state; - node.metadata.last_updated = chrono::Utc::now(); - Ok(old) - } - - /// Update or insert properties on a node. Returns only the keys whose values actually changed. - /// - /// A key is considered changed if it is new (not previously present) or if its value - /// differs from the stored value (compared via `serde_json::Value::PartialEq`). - /// The `last_updated` timestamp is only bumped when at least one value changed. - pub fn update_properties( - &mut self, - id: &NodeId, - props: HashMap, - ) -> Result> { - let node = self - .nodes - .get_mut(id) - .ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?; - - let mut changed_keys = Vec::new(); - for (key, new_value) in props { - let is_changed = match node.metadata.properties.get(&key) { - Some(existing) => existing != &new_value, - None => true, // new key - }; - if is_changed { - changed_keys.push(key.clone()); - node.metadata.properties.insert(key, new_value); - } - } - - if !changed_keys.is_empty() { - node.metadata.last_updated = chrono::Utc::now(); - } - - Ok(changed_keys) - } - - /// Check if a node exists. - pub fn contains(&self, id: &NodeId) -> bool { - self.nodes.contains_key(id) - } - - /// Get all descendant IDs of a node (not including the node itself). - pub fn descendants(&self, id: &NodeId) -> Vec { - let mut result = Vec::new(); - let mut queue: Vec = self - .nodes - .get(id) - .map(|n| n.children.clone()) - .unwrap_or_default(); - - while let Some(current) = queue.pop() { - if let Some(node) = self.nodes.get(¤t) { - queue.extend(node.children.clone()); - } - result.push(current); - } - result - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::node::{NodeKind, NodeMetadata}; - - fn root_meta() -> NodeMetadata { - NodeMetadata::new(NodeKind::Root, "Dirigent") - } - - fn connector_meta(label: &str) -> NodeMetadata { - NodeMetadata::new(NodeKind::Connector, label).with_state(NodeState::Running) - } - - fn make_tree() -> NodeTree { - let root = NodeId::new("dirigent"); - let mut tree = NodeTree::new(root.clone(), root_meta()); - - // Add category nodes - tree.insert( - root.child("connectors"), - &root, - NodeMetadata::new(NodeKind::Root, "Connectors"), - None, - ) - .unwrap(); - tree.insert( - root.child("services"), - &root, - NodeMetadata::new(NodeKind::Root, "Services"), - None, - ) - .unwrap(); - - // Add connectors - let connectors = root.child("connectors"); - tree.insert( - connectors.child("opencode-1"), - &connectors, - connector_meta("OpenCode #1"), - None, - ) - .unwrap(); - tree.insert( - connectors.child("acp-claude"), - &connectors, - connector_meta("ACP Claude"), - None, - ) - .unwrap(); - - // Add a process child under acp-claude - let acp = connectors.child("acp-claude"); - tree.insert( - acp.child("stdio-process"), - &acp, - NodeMetadata::new(NodeKind::Process, "stdio-transport") - .with_state(NodeState::Running) - .with_property("pid", serde_json::json!(42)), - None, - ) - .unwrap(); - - tree - } - - #[test] - fn test_new_tree_has_root() { - let root = NodeId::new("dirigent"); - let tree = NodeTree::new(root.clone(), root_meta()); - - assert_eq!(tree.len(), 1); - assert!(tree.get(&root).is_some()); - assert_eq!(tree.root_id(), &root); - } - - #[test] - fn test_insert_and_lookup() { - let tree = make_tree(); - - assert_eq!(tree.len(), 6); // root + connectors + services + opencode + acp + stdio - assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude"))); - assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))); - } - - #[test] - fn test_insert_duplicate_fails() { - let mut tree = make_tree(); - let result = tree.insert( - NodeId::new("dirigent/connectors/opencode-1"), - &NodeId::new("dirigent/connectors"), - connector_meta("Duplicate"), - None, - ); - assert!(matches!(result, Err(InspectorError::NodeAlreadyExists(_)))); - } - - #[test] - fn test_insert_missing_parent_fails() { - let mut tree = make_tree(); - let result = tree.insert( - NodeId::new("dirigent/nonexistent/child"), - &NodeId::new("dirigent/nonexistent"), - connector_meta("Orphan"), - None, - ); - assert!(matches!(result, Err(InspectorError::ParentNotFound(_)))); - } - - #[test] - fn test_children() { - let tree = make_tree(); - let connectors = NodeId::new("dirigent/connectors"); - let children = tree.children(&connectors); - - assert_eq!(children.len(), 2); - let child_ids: Vec<&str> = children.iter().map(|c| c.id.as_str()).collect(); - assert!(child_ids.contains(&"dirigent/connectors/opencode-1")); - assert!(child_ids.contains(&"dirigent/connectors/acp-claude")); - } - - #[test] - fn test_ancestors() { - let tree = make_tree(); - let stdio = NodeId::new("dirigent/connectors/acp-claude/stdio-process"); - let ancestors = tree.ancestors(&stdio); - - assert_eq!(ancestors.len(), 3); - assert_eq!(ancestors[0].id.as_str(), "dirigent/connectors/acp-claude"); - assert_eq!(ancestors[1].id.as_str(), "dirigent/connectors"); - assert_eq!(ancestors[2].id.as_str(), "dirigent"); - } - - #[test] - fn test_remove_reparents_children() { - let mut tree = make_tree(); - - // Remove acp-claude; its child (stdio-process) should be reparented to "connectors" - tree.remove(&NodeId::new("dirigent/connectors/acp-claude")) - .unwrap(); - - assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude"))); - assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))); - - // stdio-process should now be a child of connectors - let connectors = tree.get(&NodeId::new("dirigent/connectors")).unwrap(); - assert!(connectors - .children - .contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))); - - // stdio-process parent should be connectors - let stdio = tree - .get(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")) - .unwrap(); - assert_eq!( - stdio.parent.as_ref().unwrap().as_str(), - "dirigent/connectors" - ); - } - - #[test] - fn test_remove_subtree() { - let mut tree = make_tree(); - - tree.remove_subtree(&NodeId::new("dirigent/connectors/acp-claude")) - .unwrap(); - - assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude"))); - assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))); - assert_eq!(tree.len(), 4); // root + connectors + services + opencode - - // connectors should only have opencode-1 - let connectors = tree.get(&NodeId::new("dirigent/connectors")).unwrap(); - assert_eq!(connectors.children.len(), 1); - } - - #[test] - fn test_cannot_remove_root() { - let mut tree = make_tree(); - let result = tree.remove(&NodeId::new("dirigent")); - assert!(matches!(result, Err(InspectorError::CannotRemoveRoot))); - - let result = tree.remove_subtree(&NodeId::new("dirigent")); - assert!(matches!(result, Err(InspectorError::CannotRemoveRoot))); - } - - #[test] - fn test_update_state() { - let mut tree = make_tree(); - let id = NodeId::new("dirigent/connectors/opencode-1"); - - let old = tree - .update_state(&id, NodeState::Error("timeout".into())) - .unwrap(); - assert_eq!(old, NodeState::Running); - - let node = tree.get(&id).unwrap(); - assert_eq!(node.metadata.state, NodeState::Error("timeout".into())); - } - - #[test] - fn test_update_properties() { - let mut tree = make_tree(); - let id = NodeId::new("dirigent/connectors/acp-claude/stdio-process"); - - let mut props = HashMap::new(); - props.insert("cpu_percent".to_string(), serde_json::json!(45.2)); - props.insert("memory_mb".to_string(), serde_json::json!(128)); - - let keys = tree.update_properties(&id, props).unwrap(); - assert_eq!(keys.len(), 2); - - let node = tree.get(&id).unwrap(); - assert_eq!(node.metadata.properties["pid"], serde_json::json!(42)); // original preserved - assert_eq!( - node.metadata.properties["cpu_percent"], - serde_json::json!(45.2) - ); - } - - #[test] - fn test_update_properties_no_change() { - let mut tree = make_tree(); - let id = NodeId::new("dirigent/connectors/acp-claude/stdio-process"); - - // First update: new key, should be reported as changed - let mut props = HashMap::new(); - props.insert("cpu_percent".to_string(), serde_json::json!(45.2)); - let keys = tree.update_properties(&id, props).unwrap(); - assert_eq!(keys.len(), 1); - - // Second update: same value, should NOT be reported as changed - let mut props = HashMap::new(); - props.insert("cpu_percent".to_string(), serde_json::json!(45.2)); - let keys = tree.update_properties(&id, props).unwrap(); - assert_eq!(keys.len(), 0, "Same value should not be reported as changed"); - - // Third update: different value, should be reported - let mut props = HashMap::new(); - props.insert("cpu_percent".to_string(), serde_json::json!(50.0)); - let keys = tree.update_properties(&id, props).unwrap(); - assert_eq!(keys.len(), 1); - } - - #[test] - fn test_descendants() { - let tree = make_tree(); - let root = NodeId::new("dirigent"); - let descendants = tree.descendants(&root); - - assert_eq!(descendants.len(), 5); // all except root itself - } - - #[test] - fn test_is_empty() { - let root = NodeId::new("dirigent"); - let tree = NodeTree::new(root, root_meta()); - assert!(tree.is_empty()); - - let tree = make_tree(); - assert!(!tree.is_empty()); - } -} diff --git a/crates/dirigent_inspector/tests/integration.rs b/crates/dirigent_inspector/tests/integration.rs deleted file mode 100644 index cf40c09..0000000 --- a/crates/dirigent_inspector/tests/integration.rs +++ /dev/null @@ -1,355 +0,0 @@ -use dirigent_inspector::*; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -/// Full integration test simulating a Dirigent-like setup: -/// - Root node "dirigent" -/// - Connector nodes under "dirigent/connectors" -/// - Process node under a connector -/// - Service nodes under "dirigent/services" -/// - System node under "dirigent/system" -/// - Bidirectional channel communication -/// - Snapshot capture -#[tokio::test] -async fn test_full_inspector_tree() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - assert_eq!(root.as_str(), "dirigent"); - - // Subscribe to events - let mut event_rx = registry.subscribe(); - - // -- Build the tree structure -- - - // Category: connectors - let connectors_handle = registry - .register( - NodeId::new("dirigent/connectors"), - &root, - NodeMetadata::new(NodeKind::Custom("category".into()), "Connectors") - .with_state(NodeState::Running), - None, - ) - .await - .unwrap(); - - // Category: services - let mut services_handle = registry - .register( - NodeId::new("dirigent/services"), - &root, - NodeMetadata::new(NodeKind::Custom("category".into()), "Services") - .with_state(NodeState::Running), - None, - ) - .await - .unwrap(); - - // Category: system - let mut system_handle = registry - .register( - NodeId::new("dirigent/system"), - &root, - NodeMetadata::new(NodeKind::Custom("category".into()), "System") - .with_state(NodeState::Running), - None, - ) - .await - .unwrap(); - - // Connector: ACP Claude - let acp_handle = connectors_handle - .register_child( - NodeId::new("dirigent/connectors/acp-claude"), - NodeMetadata::new(NodeKind::Connector, "ACP Claude") - .with_state(NodeState::Running) - .with_property("transport", serde_json::json!("stdio")), - None, - ) - .await - .unwrap(); - - // Process: stdio transport child - let current_pid = std::process::id(); - let proc_handle = acp_handle - .register_child( - NodeId::new("dirigent/connectors/acp-claude/stdio-process"), - NodeMetadata::new(NodeKind::Process, "stdio-transport") - .with_state(NodeState::Running) - .with_property("pid", serde_json::json!(current_pid)), - None, - ) - .await - .unwrap(); - - // Connector: OpenCode - let _opencode_handle = connectors_handle - .register_child( - NodeId::new("dirigent/connectors/opencode-1"), - NodeMetadata::new(NodeKind::Connector, "OpenCode #1") - .with_state(NodeState::Running) - .with_property("base_url", serde_json::json!("http://localhost:12225")), - None, - ) - .await - .unwrap(); - - // Service: Archivist - let _archivist_handle = services_handle - .register_child( - NodeId::new("dirigent/services/archivist"), - NodeMetadata::new(NodeKind::Service, "Archivist EventHandler") - .with_state(NodeState::Idle), - None, - ) - .await - .unwrap(); - - // System: Host - let _host_handle = system_handle - .register_child( - NodeId::new("dirigent/system/host"), - NodeMetadata::new(NodeKind::System, "Host Machine").with_state(NodeState::Running), - None, - ) - .await - .unwrap(); - - // -- Verify tree structure -- - // dirigent, connectors, services, system, acp-claude, stdio-process, opencode-1, archivist, host = 9 - assert_eq!(registry.node_count().await, 9); - - // Check children - let root_children = registry.get_children(&root).await; - assert_eq!(root_children.len(), 3); // connectors, services, system - - let connector_children = registry - .get_children(&NodeId::new("dirigent/connectors")) - .await; - assert_eq!(connector_children.len(), 2); // acp-claude, opencode-1 - - let acp_children = registry - .get_children(&NodeId::new("dirigent/connectors/acp-claude")) - .await; - assert_eq!(acp_children.len(), 1); // stdio-process - - // -- State transitions -- - proc_handle - .set_state(NodeState::Busy("processing message".into())) - .await - .unwrap(); - - let proc_meta = registry - .get_node(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")) - .await - .unwrap(); - assert_eq!( - proc_meta.state, - NodeState::Busy("processing message".into()) - ); - - // -- Property updates -- - let mut props = HashMap::new(); - props.insert("cpu_percent".to_string(), serde_json::json!(23.5)); - props.insert("memory_mb".to_string(), serde_json::json!(256)); - proc_handle.set_properties(props).await.unwrap(); - - let proc_meta = registry - .get_node(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")) - .await - .unwrap(); - assert_eq!(proc_meta.properties["cpu_percent"], serde_json::json!(23.5)); - assert_eq!(proc_meta.properties["pid"], serde_json::json!(current_pid)); // original preserved - - // -- Snapshot -- - let snapshot = registry.snapshot().await; - assert_eq!(snapshot.node_count(), 9); - - // Verify snapshot structure - let snap_root = snapshot.root().unwrap(); - assert_eq!(snap_root.id.as_str(), "dirigent"); - assert_eq!(snap_root.children.len(), 3); - - let snap_proc = snapshot - .find(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")) - .unwrap(); - assert_eq!( - snap_proc.parent, - Some(NodeId::new("dirigent/connectors/acp-claude")) - ); - assert_eq!( - snap_proc.metadata.state, - NodeState::Busy("processing message".into()) - ); - - // Snapshot serialization roundtrip - let json = serde_json::to_string(&snapshot).unwrap(); - let deserialized: TreeSnapshot = serde_json::from_str(&json).unwrap(); - assert_eq!(deserialized.node_count(), 9); - - // -- Events -- - // Drain all events that were emitted during setup - let mut event_count = 0; - while let Ok(event) = event_rx.try_recv() { - event_count += 1; - // Just verify they're valid events - match event { - InspectorEvent::NodeRegistered { .. } - | InspectorEvent::StateChanged { .. } - | InspectorEvent::PropertiesUpdated { .. } - | InspectorEvent::NodeRemoved { .. } => {} - } - } - assert!(event_count > 0, "Should have received events"); - - // -- Cleanup: detach category handles so they survive -- - services_handle.detach(); - system_handle.detach(); -} - -/// Test process monitor with the current process. -#[tokio::test] -async fn test_process_monitor_integration() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - let current_pid = std::process::id(); - let node_id = NodeId::new("dirigent/test-process"); - - let mut handle = registry - .register( - node_id.clone(), - &root, - NodeMetadata::new(NodeKind::Process, "Test Process").with_state(NodeState::Running), - None, - ) - .await - .unwrap(); - handle.detach(); - - // Create monitor and track current process - let mut monitor = ProcessMonitor::new(); - monitor.track(current_pid, node_id.clone()); - - // Start polling - let task = monitor.start_polling(Arc::clone(®istry), Duration::from_millis(100)); - - // Wait for data to be populated - tokio::time::sleep(Duration::from_millis(350)).await; - - let meta = registry.get_node(&node_id).await.unwrap(); - assert!( - meta.properties.contains_key("pid"), - "Should have PID property" - ); - assert!( - meta.properties.contains_key("memory_bytes"), - "Should have memory property" - ); - assert_eq!(meta.properties["pid"], serde_json::json!(current_pid)); - - task.abort(); -} - -/// Test bidirectional channel communication with a simulated node loop. -#[tokio::test] -async fn test_channel_integration() { - let (sender, mut receiver) = inspector_channel(10); - - // Simulate a node loop that handles commands - let node_loop = tokio::spawn(async move { - let mut handled = 0; - while let Some((cmd, resp_tx)) = receiver.recv().await { - let response = match cmd.kind { - CommandKind::Introspect => CommandResponse::ok( - &cmd.id, - serde_json::json!({ - "queue_depth": receiver.pending_count(), - "sessions_active": 3 - }), - ), - CommandKind::Execute(ref name) if name == "restart" => { - CommandResponse::ok(&cmd.id, serde_json::json!("restarting...")) - } - _ => CommandResponse::err(&cmd.id, "unknown command"), - }; - let _ = resp_tx.send(response); - handled += 1; - if handled >= 2 { - break; - } - } - }); - - // Send introspect command - let resp = sender - .send(NodeCommand { - id: "cmd-1".to_string(), - kind: CommandKind::Introspect, - payload: serde_json::Value::Null, - }) - .await - .unwrap(); - assert!(resp.success); - assert_eq!(resp.data["sessions_active"], 3); - - // Send execute command - let resp = sender - .send(NodeCommand { - id: "cmd-2".to_string(), - kind: CommandKind::Execute("restart".to_string()), - payload: serde_json::Value::Null, - }) - .await - .unwrap(); - assert!(resp.success); - - node_loop.await.unwrap(); -} - -/// Test that dropping a handle auto-deregisters, and subtree removal works. -#[tokio::test] -async fn test_lifecycle_management() { - let registry = Arc::new(InspectorRegistry::new()); - let root = registry.root_id().await; - - // Build a subtree - let parent = registry - .register( - NodeId::new("dirigent/parent"), - &root, - NodeMetadata::new(NodeKind::Connector, "Parent"), - None, - ) - .await - .unwrap(); - - let _child1 = parent - .register_child( - NodeId::new("dirigent/parent/child1"), - NodeMetadata::new(NodeKind::Process, "Child 1"), - None, - ) - .await - .unwrap(); - - let _child2 = parent - .register_child( - NodeId::new("dirigent/parent/child2"), - NodeMetadata::new(NodeKind::AsyncTask, "Child 2"), - None, - ) - .await - .unwrap(); - - assert_eq!(registry.node_count().await, 4); // root + parent + 2 children - - // Remove entire subtree - registry - .deregister_subtree(&NodeId::new("dirigent/parent")) - .await - .unwrap(); - - assert_eq!(registry.node_count().await, 1); // only root remains -} diff --git a/crates/dirigent_process/Cargo.toml b/crates/dirigent_process/Cargo.toml deleted file mode 100644 index cfa4a31..0000000 --- a/crates/dirigent_process/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "dirigent_process" -version = "0.1.0" -edition = "2021" -description = "Cross-platform process lifecycle management for Dirigent" - -[lib] -path = "src/lib.rs" - -[features] -default = [] -tokio = ["dep:tokio"] - -[dependencies] -tracing = "0.1" -tokio = { version = "1", features = ["process", "time"], optional = true } - -[target.'cfg(windows)'.dependencies] -windows-sys = { version = "0.59", features = [ - "Win32_System_JobObjects", - "Win32_System_Threading", - "Win32_Foundation", - "Win32_System_Console", - "Win32_Security", -] } - -[target.'cfg(unix)'.dependencies] -nix = { version = "0.29", features = ["signal", "process"] } -libc = "0.2" - -[dev-dependencies] -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/dirigent_process/src/lib.rs b/crates/dirigent_process/src/lib.rs deleted file mode 100644 index 66a9f28..0000000 --- a/crates/dirigent_process/src/lib.rs +++ /dev/null @@ -1,30 +0,0 @@ -pub mod traits; -mod shutdown; - -#[cfg(windows)] -mod windows; -#[cfg(unix)] -mod unix; -#[cfg(target_os = "linux")] -mod linux; - -pub use traits::{ProcessGroupManager, ProcessLifecycle}; -pub use shutdown::graceful_shutdown_sync; -#[cfg(feature = "tokio")] -pub use shutdown::graceful_shutdown_async; - -use std::sync::Arc; - -/// Create the platform-appropriate ProcessGroupManager. -/// -/// Call `init()` on the returned manager before use. -pub fn create_manager() -> Arc { - #[cfg(windows)] - { Arc::new(windows::WindowsProcessGroupManager::new()) } - - #[cfg(target_os = "linux")] - { Arc::new(linux::LinuxProcessGroupManager::new()) } - - #[cfg(all(unix, not(target_os = "linux")))] - { Arc::new(unix::UnixProcessGroupManager::new()) } -} diff --git a/crates/dirigent_process/src/linux.rs b/crates/dirigent_process/src/linux.rs deleted file mode 100644 index 50fa07e..0000000 --- a/crates/dirigent_process/src/linux.rs +++ /dev/null @@ -1,91 +0,0 @@ -#![cfg(target_os = "linux")] - -use crate::traits::{ProcessGroupManager, ProcessLifecycle}; -use nix::sys::signal::{killpg, Signal}; -use nix::unistd::Pid; -use std::io; -use std::os::unix::process::CommandExt; -use tracing::{debug, info, warn}; - -/// Linux process group manager with kernel-level orphan prevention. -/// -/// Uses `PR_SET_CHILD_SUBREAPER` so orphaned grandchildren are reparented -/// to this process, and `PR_SET_PDEATHSIG` so children auto-die when -/// the parent crashes. -pub struct LinuxProcessGroupManager; - -impl LinuxProcessGroupManager { - pub fn new() -> Self { Self } -} - -impl ProcessGroupManager for LinuxProcessGroupManager { - fn init(&self) -> Result<(), io::Error> { - unsafe { - if libc::prctl(libc::PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0) != 0 { - let err = io::Error::last_os_error(); - warn!(error = %err, "Failed to set PR_SET_CHILD_SUBREAPER"); - return Err(err); - } - } - info!("Linux process group manager initialized (child subreaper enabled)"); - Ok(()) - } - - fn create_lifecycle(&self) -> Box { - Box::new(LinuxProcessLifecycle) - } -} - -pub struct LinuxProcessLifecycle; - -impl ProcessLifecycle for LinuxProcessLifecycle { - fn configure_command(&self, cmd: &mut std::process::Command) { - unsafe { - cmd.pre_exec(|| { - if libc::setpgid(0, 0) != 0 { - return Err(io::Error::last_os_error()); - } - if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL, 0, 0, 0) != 0 { - return Err(io::Error::last_os_error()); - } - Ok(()) - }); - } - } - - #[cfg(feature = "tokio")] - fn configure_async_command(&self, cmd: &mut tokio::process::Command) { - unsafe { - cmd.pre_exec(|| { - if libc::setpgid(0, 0) != 0 { - return Err(io::Error::last_os_error()); - } - if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL, 0, 0, 0) != 0 { - return Err(io::Error::last_os_error()); - } - Ok(()) - }); - } - } - - fn register_child(&self, pid: u32) -> Result<(), io::Error> { - debug!(pid, pgid = pid, "Linux child registered (process group + PR_SET_PDEATHSIG)"); - Ok(()) - } - - fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> { - let pgid = Pid::from_raw(pid as i32); - killpg(pgid, Signal::SIGTERM) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - debug!(pid, "Sent SIGTERM to process group"); - Ok(()) - } - - fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> { - let pgid = Pid::from_raw(pid as i32); - killpg(pgid, Signal::SIGKILL) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - debug!(pid, "Sent SIGKILL to process group"); - Ok(()) - } -} diff --git a/crates/dirigent_process/src/shutdown.rs b/crates/dirigent_process/src/shutdown.rs deleted file mode 100644 index dba4483..0000000 --- a/crates/dirigent_process/src/shutdown.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::traits::ProcessLifecycle; -use std::time::Duration; - -/// Graceful shutdown: send signal → wait → force kill (sync, blocking). -/// -/// Returns `true` if the process exited within the timeout, `false` if force-killed. -pub fn graceful_shutdown_sync( - lifecycle: &dyn ProcessLifecycle, - child: &mut std::process::Child, - timeout: Duration, -) -> bool { - let pid = child.id(); - if pid == 0 { - return true; - } - - if lifecycle.send_shutdown_signal(pid).is_err() { - return true; - } - - let start = std::time::Instant::now(); - let poll_interval = Duration::from_millis(50); - - while start.elapsed() < timeout { - match child.try_wait() { - Ok(Some(_)) => return true, - Ok(None) => std::thread::sleep(poll_interval), - Err(_) => return true, - } - } - - tracing::debug!(pid, "Graceful shutdown timed out, force killing"); - let _ = lifecycle.send_kill_signal(pid); - let _ = child.wait(); - false -} - -/// Graceful shutdown: send signal → wait → force kill (async, non-blocking). -/// -/// Returns `true` if the process exited within the timeout, `false` if force-killed. -#[cfg(feature = "tokio")] -pub async fn graceful_shutdown_async( - lifecycle: &dyn ProcessLifecycle, - child: &mut tokio::process::Child, - timeout: Duration, -) -> bool { - let pid = match child.id() { - Some(0) | None => return true, - Some(pid) => pid, - }; - - if lifecycle.send_shutdown_signal(pid).is_err() { - return true; - } - - match tokio::time::timeout(timeout, child.wait()).await { - Ok(Ok(_)) => true, - Ok(Err(_)) => true, - Err(_) => { - tracing::debug!(pid, "Graceful shutdown timed out, force killing"); - let _ = lifecycle.send_kill_signal(pid); - let _ = child.wait().await; - false - } - } -} diff --git a/crates/dirigent_process/src/traits.rs b/crates/dirigent_process/src/traits.rs deleted file mode 100644 index b81c865..0000000 --- a/crates/dirigent_process/src/traits.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::io; - -/// Global process group manager — one per application lifetime. -/// -/// On Windows, owns a Job Object with KILL_ON_JOB_CLOSE. -/// On Linux, configures the process as a child subreaper. -/// On macOS, no-op (process groups handle cleanup). -pub trait ProcessGroupManager: Send + Sync { - /// Initialize platform-specific parent process configuration. - fn init(&self) -> Result<(), io::Error>; - - /// Create a lifecycle handle for managing a child process. - fn create_lifecycle(&self) -> Box; -} - -/// Per-child process lifecycle manager. -/// -/// All methods are synchronous — OS signal/handle calls are instant. -/// For timeout-based shutdown, use the free functions in the `shutdown` module. -pub trait ProcessLifecycle: Send + Sync { - /// Configure a std::process::Command before spawning. - /// Sets platform-specific flags (process group, creation flags, pre_exec hooks). - fn configure_command(&self, cmd: &mut std::process::Command); - - /// Configure a tokio::process::Command before spawning. - #[cfg(feature = "tokio")] - fn configure_async_command(&self, cmd: &mut tokio::process::Command); - - /// Register a spawned child with the lifecycle manager. - /// Must be called immediately after spawn with the child's PID. - fn register_child(&self, pid: u32) -> Result<(), io::Error>; - - /// Send a graceful shutdown signal to the process (and its tree). - /// Windows: CTRL_BREAK_EVENT. Unix: SIGTERM to process group. - fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error>; - - /// Forcefully kill the process (and its tree). - /// Windows: TerminateProcess. Unix: SIGKILL to process group. - fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error>; -} diff --git a/crates/dirigent_process/src/unix.rs b/crates/dirigent_process/src/unix.rs deleted file mode 100644 index a63b470..0000000 --- a/crates/dirigent_process/src/unix.rs +++ /dev/null @@ -1,78 +0,0 @@ -#![cfg(unix)] - -use crate::traits::{ProcessGroupManager, ProcessLifecycle}; -use nix::sys::signal::{killpg, Signal}; -use nix::unistd::Pid; -use std::io; -use std::os::unix::process::CommandExt; -use tracing::{debug, info}; - -/// macOS / generic Unix process group manager. -/// -/// Uses process groups for tree management. No kernel-level orphan -/// prevention (macOS lacks `PR_SET_PDEATHSIG`). Relies on launchd -/// supervision for crash recovery. -pub struct UnixProcessGroupManager; - -impl UnixProcessGroupManager { - pub fn new() -> Self { Self } -} - -impl ProcessGroupManager for UnixProcessGroupManager { - fn init(&self) -> Result<(), io::Error> { - info!("Unix process group manager initialized"); - Ok(()) - } - - fn create_lifecycle(&self) -> Box { - Box::new(UnixProcessLifecycle) - } -} - -pub struct UnixProcessLifecycle; - -impl ProcessLifecycle for UnixProcessLifecycle { - fn configure_command(&self, cmd: &mut std::process::Command) { - unsafe { - cmd.pre_exec(|| { - if libc::setpgid(0, 0) != 0 { - return Err(io::Error::last_os_error()); - } - Ok(()) - }); - } - } - - #[cfg(feature = "tokio")] - fn configure_async_command(&self, cmd: &mut tokio::process::Command) { - unsafe { - cmd.pre_exec(|| { - if libc::setpgid(0, 0) != 0 { - return Err(io::Error::last_os_error()); - } - Ok(()) - }); - } - } - - fn register_child(&self, pid: u32) -> Result<(), io::Error> { - debug!(pid, pgid = pid, "Child registered in its own process group"); - Ok(()) - } - - fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> { - let pgid = Pid::from_raw(pid as i32); - killpg(pgid, Signal::SIGTERM) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - debug!(pid, "Sent SIGTERM to process group"); - Ok(()) - } - - fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> { - let pgid = Pid::from_raw(pid as i32); - killpg(pgid, Signal::SIGKILL) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - debug!(pid, "Sent SIGKILL to process group"); - Ok(()) - } -} diff --git a/crates/dirigent_process/src/windows.rs b/crates/dirigent_process/src/windows.rs deleted file mode 100644 index 90e1dad..0000000 --- a/crates/dirigent_process/src/windows.rs +++ /dev/null @@ -1,199 +0,0 @@ -#![cfg(windows)] - -use crate::traits::{ProcessGroupManager, ProcessLifecycle}; -use std::io; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; -use tracing::{debug, info, warn}; -use windows_sys::Win32::Foundation::{CloseHandle, FALSE, HANDLE}; -use windows_sys::Win32::System::JobObjects::{ - AssignProcessToJobObject, CreateJobObjectW, JobObjectExtendedLimitInformation, - SetInformationJobObject, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, -}; -use windows_sys::Win32::System::Threading::{ - OpenProcess, TerminateProcess, PROCESS_ALL_ACCESS, -}; -use windows_sys::Win32::System::Console::{ - GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT, -}; - -/// Windows process group manager using Job Objects. -/// -/// Creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` — when -/// this manager is dropped (or the process crashes), the OS automatically -/// kills all assigned child processes including grandchildren. -pub struct WindowsProcessGroupManager { - /// Wrapped in a Mutex so we can mutate through a shared reference after init. - job_handle: Mutex, - initialized: AtomicBool, -} - -// Safety: HANDLE (*mut c_void) is not Send/Sync by default, but we only -// mutate it during init() (guarded by AtomicBool + Mutex) and read it -// (via copy) in create_lifecycle() and drop. No concurrent mutation occurs. -unsafe impl Send for WindowsProcessGroupManager {} -unsafe impl Sync for WindowsProcessGroupManager {} - -impl WindowsProcessGroupManager { - pub fn new() -> Self { - Self { - job_handle: Mutex::new(std::ptr::null_mut()), - initialized: AtomicBool::new(false), - } - } - - fn handle(&self) -> HANDLE { - *self.job_handle.lock().unwrap() - } -} - -impl Default for WindowsProcessGroupManager { - fn default() -> Self { - Self::new() - } -} - -impl ProcessGroupManager for WindowsProcessGroupManager { - fn init(&self) -> Result<(), io::Error> { - if self.initialized.swap(true, Ordering::SeqCst) { - return Ok(()); - } - - unsafe { - let handle = CreateJobObjectW(std::ptr::null(), std::ptr::null()); - if handle.is_null() { - self.initialized.store(false, Ordering::SeqCst); - return Err(io::Error::last_os_error()); - } - - let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); - info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; - - let result = SetInformationJobObject( - handle, - JobObjectExtendedLimitInformation, - &info as *const _ as *const _, - std::mem::size_of::() as u32, - ); - - if result == FALSE { - CloseHandle(handle); - self.initialized.store(false, Ordering::SeqCst); - return Err(io::Error::last_os_error()); - } - - *self.job_handle.lock().unwrap() = handle; - - info!("Windows Job Object created with KILL_ON_JOB_CLOSE"); - Ok(()) - } - } - - fn create_lifecycle(&self) -> Box { - Box::new(WindowsProcessLifecycle { - job_handle: self.handle(), - }) - } -} - -impl Drop for WindowsProcessGroupManager { - fn drop(&mut self) { - let handle = self.handle(); - if !handle.is_null() { - unsafe { CloseHandle(handle); } - debug!("Windows Job Object closed"); - } - } -} - -/// Per-child lifecycle manager for Windows. -/// -/// Assigns children to the parent's Job Object and uses -/// `CTRL_BREAK_EVENT` / `TerminateProcess` for shutdown. -pub struct WindowsProcessLifecycle { - job_handle: HANDLE, -} - -// Safety: same reasoning as WindowsProcessGroupManager — HANDLE is used -// read-only after construction (only passed to OS APIs). -unsafe impl Send for WindowsProcessLifecycle {} -unsafe impl Sync for WindowsProcessLifecycle {} - -impl ProcessLifecycle for WindowsProcessLifecycle { - fn configure_command(&self, cmd: &mut std::process::Command) { - use std::os::windows::process::CommandExt; - const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200; - const CREATE_NO_WINDOW: u32 = 0x0800_0000; - cmd.creation_flags(CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW); - } - - #[cfg(feature = "tokio")] - fn configure_async_command(&self, cmd: &mut tokio::process::Command) { - use std::os::windows::process::CommandExt; - const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200; - const CREATE_NO_WINDOW: u32 = 0x0800_0000; - cmd.creation_flags(CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW); - } - - fn register_child(&self, pid: u32) -> Result<(), io::Error> { - if self.job_handle.is_null() { - warn!(pid, "Job Object not initialized, skipping child registration"); - return Ok(()); - } - - unsafe { - let process_handle = OpenProcess(PROCESS_ALL_ACCESS, FALSE, pid); - if process_handle.is_null() { - return Err(io::Error::last_os_error()); - } - - let result = AssignProcessToJobObject(self.job_handle, process_handle); - CloseHandle(process_handle); - - if result == FALSE { - let err = io::Error::last_os_error(); - warn!(pid, error = %err, "Failed to assign process to Job Object (may already be in a job)"); - return Err(err); - } - - debug!(pid, "Process assigned to Job Object"); - Ok(()) - } - } - - fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> { - unsafe { - if GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid) == FALSE { - return Err(io::Error::last_os_error()); - } - } - debug!(pid, "Sent CTRL_BREAK_EVENT"); - Ok(()) - } - - fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> { - unsafe { - let handle = OpenProcess(PROCESS_ALL_ACCESS, FALSE, pid); - if handle.is_null() { - let err = io::Error::last_os_error(); - // ERROR_INVALID_PARAMETER (87) means process already exited - if err.raw_os_error() == Some(87) { - return Ok(()); - } - return Err(err); - } - let result = TerminateProcess(handle, 1); - CloseHandle(handle); - if result == FALSE { - let err = io::Error::last_os_error(); - // ERROR_ACCESS_DENIED (5) — process may have already exited - if err.raw_os_error() == Some(5) { - return Ok(()); - } - return Err(err); - } - } - debug!(pid, "Sent TerminateProcess"); - Ok(()) - } -} diff --git a/crates/dirigent_process/tests/lifecycle.rs b/crates/dirigent_process/tests/lifecycle.rs deleted file mode 100644 index 39a5264..0000000 --- a/crates/dirigent_process/tests/lifecycle.rs +++ /dev/null @@ -1,149 +0,0 @@ -use dirigent_process::{create_manager, graceful_shutdown_sync}; -use std::process::Command; -use std::time::Duration; - -/// Build a long-running command that does not require a TTY on any platform. -/// -/// On Windows, `timeout /t N /nobreak` fails when stdin is a pipe (no console), -/// so we use `ping -n N 127.0.0.1` which sleeps approximately N-1 seconds with -/// no TTY requirement. -/// -/// On Unix, `sleep N` is the idiomatic choice. -#[cfg(windows)] -fn long_sleep_cmd(seconds: u32) -> Command { - let mut cmd = Command::new("ping"); - cmd.args(["-n", &seconds.to_string(), "127.0.0.1"]); - cmd -} - -#[cfg(unix)] -fn long_sleep_cmd(seconds: u32) -> Command { - let mut cmd = Command::new("sleep"); - cmd.arg(seconds.to_string()); - cmd -} - -#[cfg(all(windows, feature = "tokio"))] -fn long_sleep_async_cmd(seconds: u32) -> tokio::process::Command { - let mut cmd = tokio::process::Command::new("ping"); - cmd.args(["-n", &seconds.to_string(), "127.0.0.1"]); - cmd -} - -#[cfg(all(unix, feature = "tokio"))] -fn long_sleep_async_cmd(seconds: u32) -> tokio::process::Command { - let mut cmd = tokio::process::Command::new("sleep"); - cmd.arg(seconds.to_string()); - cmd -} - -#[test] -fn test_manager_init() { - let mgr = create_manager(); - mgr.init().expect("init should succeed"); - // Double init should also succeed (idempotent) - mgr.init().expect("double init should succeed"); -} - -#[test] -fn test_create_lifecycle() { - let mgr = create_manager(); - mgr.init().expect("init failed"); - let _lifecycle = mgr.create_lifecycle(); -} - -#[test] -fn test_configure_and_spawn() { - let mgr = create_manager(); - mgr.init().expect("init failed"); - let lifecycle = mgr.create_lifecycle(); - - let mut cmd = long_sleep_cmd(30); - lifecycle.configure_command(&mut cmd); - - let mut child = cmd.spawn().expect("spawn failed"); - let pid = child.id(); - assert!(pid > 0); - - // Register should succeed - lifecycle.register_child(pid).expect("register failed"); - - // Process should still be running - assert!(child.try_wait().expect("try_wait failed").is_none()); - - // Clean up - let _ = child.kill(); - let _ = child.wait(); -} - -#[test] -fn test_graceful_shutdown_sync() { - let mgr = create_manager(); - mgr.init().expect("init failed"); - let lifecycle = mgr.create_lifecycle(); - - let mut cmd = long_sleep_cmd(60); - lifecycle.configure_command(&mut cmd); - let mut child = cmd.spawn().expect("spawn failed"); - let pid = child.id(); - lifecycle.register_child(pid).expect("register failed"); - - // Graceful shutdown with 3s timeout — process won't exit voluntarily, - // so it should be force-killed after timeout - let exited_gracefully = graceful_shutdown_sync( - lifecycle.as_ref(), - &mut child, - Duration::from_secs(3), - ); - - // Process should be dead now - assert!(child.try_wait().expect("try_wait failed").is_some()); - // It was force-killed (ping/sleep don't handle SIGTERM/CTRL_BREAK) - assert!(!exited_gracefully); -} - -#[test] -fn test_send_kill_signal() { - let mgr = create_manager(); - mgr.init().expect("init failed"); - let lifecycle = mgr.create_lifecycle(); - - let mut cmd = long_sleep_cmd(60); - lifecycle.configure_command(&mut cmd); - let mut child = cmd.spawn().expect("spawn failed"); - let pid = child.id(); - lifecycle.register_child(pid).expect("register failed"); - - // Direct kill signal - lifecycle.send_kill_signal(pid).expect("kill failed"); - - // Wait for process to die - let status = child.wait().expect("wait failed"); - assert!(!status.success()); -} - -#[cfg(feature = "tokio")] -#[tokio::test] -async fn test_async_graceful_shutdown() { - use dirigent_process::graceful_shutdown_async; - - let mgr = create_manager(); - mgr.init().expect("init failed"); - let lifecycle = mgr.create_lifecycle(); - - let mut cmd = long_sleep_async_cmd(60); - lifecycle.configure_async_command(&mut cmd); - let mut child = cmd.spawn().expect("spawn failed"); - let pid = child.id().expect("no pid"); - lifecycle.register_child(pid).expect("register failed"); - - let exited_gracefully = graceful_shutdown_async( - lifecycle.as_ref(), - &mut child, - Duration::from_secs(3), - ) - .await; - - assert!(child.try_wait().expect("try_wait failed").is_some()); - assert!(!exited_gracefully); -}