diff --git a/Cargo.toml b/Cargo.toml
index 2e48c65..455c8fb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,8 +7,6 @@ members = [
"crates/dirigent_auth",
"crates/dirigent_config",
"crates/dirigent_acp_api",
- "crates/dirigent_inspector",
- "crates/dirigent_process",
"crates/opencode_client",
]
@@ -26,6 +24,4 @@ dirigent_tools = { path = "crates/dirigent_tools" }
dirigent_auth = { path = "crates/dirigent_auth" }
dirigent_config = { path = "crates/dirigent_config" }
dirigent_acp_api = { path = "crates/dirigent_acp_api" }
-dirigent_inspector = { path = "crates/dirigent_inspector" }
-dirigent_process = { path = "crates/dirigent_process" }
opencode_client = { path = "crates/opencode_client" }
diff --git a/README.md b/README.md
index 2c97445..3804abf 100644
--- a/README.md
+++ b/README.md
@@ -31,8 +31,25 @@ These tools are developed in this monorepo but distributed as independent reposi
**Layers top-to-bottom:**
- **Consumers** *(shadow)* — server assembly, web app, integrations — not in this repo
- **Standalone Tools** — installable from their own repositories; depend on these crates
-- **Orchestration** — connector runtime, ACP server, introspection
-- **Foundation** — protocol types, tool sandbox, configuration, auth, process management
+- **Orchestration** — connector runtime, ACP server
+- **Foundation** — protocol types, tool sandbox, configuration, auth
+
+---
+
+## Core Runtime
+
+`dirigent_core` is the central crate. It manages long-lived connections to external agent systems through a **Connector** abstraction — each connector wraps a bidirectional communication channel to an agent (Claude Code over stdio, OpenCode.ai over HTTP+SSE, or an incoming ACP connection).
+
+
+
+
+
+**Key concepts:**
+
+- **CoreRuntime** is a stateless orchestrator. It owns the connector registry, the event bus, and lifecycle hooks — but never caches session state. The external agent is always authoritative.
+- **Connectors** implement a common trait (`command_tx()` to send commands, `subscribe()` to receive events) and run as independent background tasks. Four implementations ship today: ACP (stdio/HTTP JSON-RPC), Gateway (local echo + session transfer), OpenCode (REST+SSE), and AcpAcceptor (incoming connections).
+- **SharingBus** is the event backbone. Every connector event is published once and fan-out to filtered subscribers — archivist, web UI, stream integrations — without the bus knowing about any of them.
+- **Lifecycle Hooks** let the server assembly inject services (inspector, process manager, archivist) at connector creation time. Core defines abstract traits (`ConnectorInspector`, `ProcessGroupManager`) and never depends on their implementations directly.
---
@@ -43,10 +60,8 @@ These tools are developed in this monorepo but distributed as independent reposi
| `dirigent_core` | beta | Multi-connector orchestration runtime |
| `dirigent_protocol` | beta | ACP protocol types — messages, events, and RPC definitions |
| `dirigent_acp_api` | beta | ACP server for incoming agent connections |
-| `dirigent_inspector` | concept | Runtime introspection tree |
| `dirigent_config` | beta | Configuration management |
| `dirigent_auth` | concept | User authorization model |
-| `dirigent_process` | beta | Child process management |
| `dirigent_tools` | concept | Tool sandbox and execution abstractions |
| `opencode_client` | beta | OpenCode.ai HTTP client |
diff --git a/architecture.svg b/architecture.svg
index 8c48646..7cc6f13 100644
--- a/architecture.svg
+++ b/architecture.svg
@@ -37,46 +37,40 @@
ORCHESTRATION
-
- dirigent_core
- connector runtime
-
- dirigent_acp_api
- ACP server
-
- dirigent_inspector
- introspection tree
+
+ dirigent_core
+ connector runtime
+
+ dirigent_acp_api
+ ACP server
FOUNDATION
-
- protocol
- ACP types
-
- tools
- sandbox
-
- config
- paths + toml
-
- auth
- accounts
-
- process
- lifecycle
-
- opencode
- HTTP client
+
+ protocol
+ ACP types
+
+ tools
+ sandbox
+
+ config
+ paths + toml
+
+ auth
+ accounts
+
+ opencode
+ HTTP client
-
-
-
-
-
+
+
+
+
+
Shadow boxes = downstream consumers not included in this repository
- 9 crates — minimal set for dirigate and standalone tool dependencies
+ 7 crates — minimal set for dirigate and standalone tool dependencies
diff --git a/core-architecture.svg b/core-architecture.svg
new file mode 100644
index 0000000..37e8ba9
--- /dev/null
+++ b/core-architecture.svg
@@ -0,0 +1,118 @@
+
diff --git a/crates/dirigent_core/Cargo.toml b/crates/dirigent_core/Cargo.toml
index b17d812..41e970f 100644
--- a/crates/dirigent_core/Cargo.toml
+++ b/crates/dirigent_core/Cargo.toml
@@ -31,8 +31,6 @@ dirigent_acp_api = { path = "../dirigent_acp_api", optional = true }
# Workspace dependencies
dirigent_config = { path = "../dirigent_config", optional = true }
dirigent_auth = { path = "../dirigent_auth" }
-dirigent_process = { path = "../dirigent_process", features = ["tokio"], optional = true }
-dirigent_inspector = { path = "../dirigent_inspector", optional = true }
dirigent_protocol = { path = "../dirigent_protocol", features = ["adapters"], optional = true }
dirigent_tools = { path = "../dirigent_tools", optional = true }
# SSE client for ACP transport
@@ -84,7 +82,6 @@ server = [
"dep:blake3",
"dep:dirigent_acp_api",
"dep:dirigent_config",
- "dep:dirigent_inspector",
"dep:dirigent_protocol",
"dep:dirigent_tools",
"dep:eventsource-client",
@@ -99,5 +96,4 @@ server = [
"dep:tower-http",
"dep:tracing",
"dep:tracing-subscriber",
- "dep:dirigent_process",
]
diff --git a/crates/dirigent_core/src/connectors/acp/connector.rs b/crates/dirigent_core/src/connectors/acp/connector.rs
index e2dedd6..cb5648a 100644
--- a/crates/dirigent_core/src/connectors/acp/connector.rs
+++ b/crates/dirigent_core/src/connectors/acp/connector.rs
@@ -83,29 +83,29 @@ macro_rules! debug_log {
/// Called after `upsert_session` and on session metadata changes.
#[cfg(feature = "server")]
async fn inspector_upsert_session(
- inspector: &Arc,
+ inspector: &Arc,
connector_id: &str,
session: &SessionInfo,
) {
- let sess_node_id = dirigent_inspector::NodeId::new(format!(
+ let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session.id
));
let parent_id =
- dirigent_inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
+ dirigent_protocol::inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
let node_state = match session.status {
- SessionStatus::Active => dirigent_inspector::NodeState::Running,
- SessionStatus::Processing => dirigent_inspector::NodeState::Busy("Generating".to_string()),
- SessionStatus::Idle => dirigent_inspector::NodeState::Idle,
- SessionStatus::Ended => dirigent_inspector::NodeState::Stopped,
+ SessionStatus::Active => dirigent_protocol::inspector::NodeState::Running,
+ SessionStatus::Processing => dirigent_protocol::inspector::NodeState::Busy("Generating".to_string()),
+ SessionStatus::Idle => dirigent_protocol::inspector::NodeState::Idle,
+ SessionStatus::Ended => dirigent_protocol::inspector::NodeState::Stopped,
};
let label = session.title.as_deref().unwrap_or(&session.id);
// Try to register; if already exists, update instead
let meta =
- dirigent_inspector::NodeMetadata::new(dirigent_inspector::NodeKind::AsyncTask, label)
+ dirigent_protocol::inspector::NodeMetadata::new(dirigent_protocol::inspector::NodeKind::AsyncTask, label)
.with_state(node_state.clone())
.with_property("session_id", serde_json::json!(&session.id))
.with_property("status", serde_json::json!(format!("{:?}", session.status)));
@@ -129,11 +129,10 @@ async fn inspector_upsert_session(
};
match inspector
- .register(sess_node_id.clone(), &parent_id, meta, None)
+ .register_node(sess_node_id.clone(), &parent_id, meta)
.await
{
- Ok(mut handle) => {
- handle.detach();
+ Ok(()) => {
trace!(connector_id = %connector_id, session_id = %session.id, "Registered session with inspector");
}
Err(_) => {
@@ -165,12 +164,12 @@ async fn inspector_upsert_session(
/// Update only the inspector state for a session (lightweight, no property changes).
#[cfg(feature = "server")]
async fn inspector_update_session_state(
- inspector: &Arc,
+ inspector: &Arc,
connector_id: &str,
session_id: &str,
- state: dirigent_inspector::NodeState,
+ state: dirigent_protocol::inspector::NodeState,
) {
- let sess_node_id = dirigent_inspector::NodeId::new(format!(
+ let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session_id
));
@@ -180,13 +179,13 @@ async fn inspector_update_session_state(
/// Deregister all session nodes for a connector from the inspector.
#[cfg(feature = "server")]
async fn inspector_deregister_all_sessions(
- inspector: &Arc,
+ inspector: &Arc,
connector_id: &str,
internal_state: &InternalState,
) {
let sessions = internal_state.list_sessions().await;
for session in sessions {
- let sess_node_id = dirigent_inspector::NodeId::new(format!(
+ let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session.id
));
@@ -277,7 +276,7 @@ pub struct AcpConnector {
/// Optional inspector registry for PID tracking of stdio processes
#[cfg(feature = "server")]
- inspector: Option>,
+ inspector: Option>,
/// Optional process group manager for lifecycle management of stdio processes.
///
@@ -286,7 +285,7 @@ pub struct AcpConnector {
/// tracked in the platform job object / process group, and are shut down
/// gracefully on close.
#[cfg(feature = "server")]
- process_manager: Option>,
+ process_manager: Option>,
}
impl AcpConnector {
@@ -388,7 +387,7 @@ impl AcpConnector {
#[cfg(feature = "server")]
pub fn with_inspector(
mut self,
- inspector: Option>,
+ inspector: Option>,
) -> Self {
self.inspector = inspector;
self
@@ -402,7 +401,7 @@ impl AcpConnector {
#[cfg(feature = "server")]
pub fn with_process_manager(
mut self,
- process_manager: Option>,
+ process_manager: Option>,
) -> Self {
self.process_manager = process_manager;
self
@@ -504,8 +503,8 @@ impl AcpConnector {
pending_agent_requests: Arc>>,
session_states: Arc>>,
mut cmd_rx: mpsc::Receiver,
- #[cfg(feature = "server")] inspector: Option>,
- #[cfg(feature = "server")] process_manager: Option>,
+ #[cfg(feature = "server")] inspector: Option>,
+ #[cfg(feature = "server")] process_manager: Option>,
) {
debug_log!("🚀 ACP connector {} task started", id);
info!(connector_id = %id, "ACP connector task started");
@@ -690,26 +689,25 @@ impl AcpConnector {
#[cfg(feature = "server")]
if let Some(ref inspector) = inspector {
if let Some(pid) = transport.pid().await {
- let process_node_id = dirigent_inspector::NodeId::new(format!(
+ let process_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/process",
id
));
- let parent_node_id = dirigent_inspector::NodeId::new(format!(
+ let parent_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}",
id
));
- let meta = dirigent_inspector::NodeMetadata::new(
- dirigent_inspector::NodeKind::Process,
+ let meta = dirigent_protocol::inspector::NodeMetadata::new(
+ dirigent_protocol::inspector::NodeKind::Process,
"stdio-process",
)
- .with_state(dirigent_inspector::NodeState::Running)
+ .with_state(dirigent_protocol::inspector::NodeState::Running)
.with_property("pid", serde_json::json!(pid))
.with_property("transport", serde_json::json!("stdio"));
- if let Ok(mut handle) = inspector
- .register(process_node_id, &parent_node_id, meta, None)
+ if let Ok(()) = inspector
+ .register_node(process_node_id, &parent_node_id, meta)
.await
{
- handle.detach();
info!(connector_id = %id, pid = pid, "Registered stdio process with inspector");
}
}
@@ -1560,7 +1558,7 @@ impl AcpConnector {
inspector,
&id,
&session_id,
- dirigent_inspector::NodeState::Busy("Generating".to_string()),
+ dirigent_protocol::inspector::NodeState::Busy("Generating".to_string()),
).await;
}
@@ -2433,7 +2431,7 @@ impl AcpConnector {
inspector,
&id,
session_id,
- dirigent_inspector::NodeState::Idle,
+ dirigent_protocol::inspector::NodeState::Idle,
).await;
}
}
@@ -2453,7 +2451,7 @@ impl AcpConnector {
/// Create transport based on configuration
async fn create_transport(
config: &AcpConfig,
- #[cfg(feature = "server")] process_manager: Option<&Arc>,
+ #[cfg(feature = "server")] process_manager: Option<&Arc>,
) -> AcpResult> {
match &config.transport {
TransportKind::Stdio {
diff --git a/crates/dirigent_core/src/connectors/acp/transport/stdio.rs b/crates/dirigent_core/src/connectors/acp/transport/stdio.rs
index 0be83f2..abc2adc 100644
--- a/crates/dirigent_core/src/connectors/acp/transport/stdio.rs
+++ b/crates/dirigent_core/src/connectors/acp/transport/stdio.rs
@@ -179,7 +179,7 @@ pub struct StdioTransport {
/// force-killing the process. Without it, the original hard-kill behavior
/// is preserved.
#[cfg(feature = "server")]
- process_lifecycle: Option>,
+ process_lifecycle: Option>,
}
impl StdioTransport {
@@ -307,7 +307,7 @@ impl StdioTransport {
///
/// Must be called before `connect()`.
#[cfg(feature = "server")]
- pub fn set_process_lifecycle(&mut self, lifecycle: Box) {
+ pub fn set_process_lifecycle(&mut self, lifecycle: Box) {
self.process_lifecycle = Some(lifecycle);
}
@@ -837,7 +837,7 @@ impl AcpTransport for StdioTransport {
if let Some(ref lifecycle) = self.process_lifecycle {
if child.id().is_some() {
// Graceful shutdown: SIGTERM/CTRL_BREAK → wait → force kill
- dirigent_process::graceful_shutdown_async(
+ crate::traits::graceful_shutdown_async(
lifecycle.as_ref(),
&mut child,
std::time::Duration::from_secs(5),
diff --git a/crates/dirigent_core/src/connectors/gateway/mod.rs b/crates/dirigent_core/src/connectors/gateway/mod.rs
index 9cc5019..875b42a 100644
--- a/crates/dirigent_core/src/connectors/gateway/mod.rs
+++ b/crates/dirigent_core/src/connectors/gateway/mod.rs
@@ -292,32 +292,32 @@ impl Default for GatewayConfig {
/// Register a Gateway session node in the inspector registry.
#[cfg(feature = "server")]
async fn inspector_register_gateway_session(
- inspector: &Arc,
+ inspector: &Arc,
connector_id: &str,
session: &GatewaySession,
) {
- let sess_node_id = dirigent_inspector::NodeId::new(format!(
+ let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session.id
));
let parent_id =
- dirigent_inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
+ dirigent_protocol::inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
- let meta = dirigent_inspector::NodeMetadata::new(
- dirigent_inspector::NodeKind::AsyncTask,
+ let meta = dirigent_protocol::inspector::NodeMetadata::new(
+ dirigent_protocol::inspector::NodeKind::AsyncTask,
&session.title,
)
- .with_state(dirigent_inspector::NodeState::Running)
+ .with_state(dirigent_protocol::inspector::NodeState::Running)
.with_property("session_id", serde_json::json!(&session.id))
.with_property("status", serde_json::json!("Active"))
.with_property("message_count", serde_json::json!(session.messages.len()));
match inspector
- .register(sess_node_id, &parent_id, meta, None)
+ .register_node(sess_node_id, &parent_id, meta)
.await
{
- Ok(mut handle) => {
- handle.detach();
+ Ok(()) => {
+ // Registered successfully
}
Err(_) => {
// Already registered — that's fine
@@ -328,12 +328,12 @@ async fn inspector_register_gateway_session(
/// Deregister all Gateway session nodes from the inspector.
#[cfg(feature = "server")]
async fn inspector_deregister_gateway_sessions(
- inspector: &Arc,
+ inspector: &Arc,
connector_id: &str,
sessions: &HashMap,
) {
for session_id in sessions.keys() {
- let sess_node_id = dirigent_inspector::NodeId::new(format!(
+ let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session_id
));
@@ -387,7 +387,7 @@ pub struct GatewayConnector {
/// Optional inspector registry for session tracking
#[cfg(feature = "server")]
- inspector: Option>,
+ inspector: Option>,
}
/// Helper that publishes an event to both the per-connector broadcast
@@ -471,7 +471,7 @@ impl GatewayConnector {
/// Set the inspector registry for session tracking.
#[cfg(feature = "server")]
- pub fn set_inspector(&mut self, inspector: Option>) {
+ pub fn set_inspector(&mut self, inspector: Option>) {
self.inspector = inspector;
}
@@ -553,7 +553,7 @@ impl GatewayConnector {
mut cmd_rx: mpsc::Receiver,
connector_list_callback: Option,
session_transfer_callback: Option,
- #[cfg(feature = "server")] inspector: Option>,
+ #[cfg(feature = "server")] inspector: Option>,
) {
info!(connector_id = %id, "Gateway connector task started");
diff --git a/crates/dirigent_core/src/hooks.rs b/crates/dirigent_core/src/hooks.rs
index 881eb3c..afee771 100644
--- a/crates/dirigent_core/src/hooks.rs
+++ b/crates/dirigent_core/src/hooks.rs
@@ -26,14 +26,14 @@ pub trait ConnectorLifecycleHooks: Send + Sync {
async fn on_connector_removed(&self, _connector_id: &str) {}
#[cfg(feature = "server")]
- fn inspector(&self) -> Option> {
+ fn inspector(&self) -> Option> {
None
}
#[cfg(feature = "server")]
fn process_manager(
&self,
- ) -> Option> {
+ ) -> Option> {
None
}
}
diff --git a/crates/dirigent_core/src/lib.rs b/crates/dirigent_core/src/lib.rs
index 60935f2..a7276fa 100644
--- a/crates/dirigent_core/src/lib.rs
+++ b/crates/dirigent_core/src/lib.rs
@@ -74,6 +74,10 @@ pub mod connectors;
#[cfg(feature = "server")]
pub mod vendors;
+// Abstract traits for connector-injected services (server-only)
+#[cfg(feature = "server")]
+pub mod traits;
+
// ACP module - Agent-Client Protocol implementation (server-only)
#[cfg(feature = "server")]
pub mod acp;
diff --git a/crates/dirigent_core/src/traits/inspector.rs b/crates/dirigent_core/src/traits/inspector.rs
new file mode 100644
index 0000000..092257d
--- /dev/null
+++ b/crates/dirigent_core/src/traits/inspector.rs
@@ -0,0 +1,38 @@
+use dirigent_protocol::inspector::{NodeId, NodeMetadata, NodeState};
+use std::collections::HashMap;
+
+/// Abstract interface for registering and updating nodes in the inspector tree.
+///
+/// Connectors use this trait to expose their internal process hierarchy
+/// (child processes, services, async tasks) without depending on the
+/// concrete `dirigent_inspector` crate.
+#[async_trait::async_trait]
+pub trait ConnectorInspector: Send + Sync {
+ /// Register a new node under `parent` with the given metadata.
+ async fn register_node(
+ &self,
+ id: NodeId,
+ parent: &NodeId,
+ metadata: NodeMetadata,
+ ) -> Result<(), Box>;
+
+ /// Remove `id` and every node below it from the tree.
+ async fn deregister_subtree(
+ &self,
+ id: &NodeId,
+ ) -> Result<(), Box>;
+
+ /// Update the runtime state of an existing node.
+ async fn update_state(
+ &self,
+ id: &NodeId,
+ state: NodeState,
+ ) -> Result<(), Box>;
+
+ /// Merge additional properties into an existing node's metadata.
+ async fn update_properties(
+ &self,
+ id: &NodeId,
+ props: HashMap,
+ ) -> Result<(), Box>;
+}
diff --git a/crates/dirigent_core/src/traits/mod.rs b/crates/dirigent_core/src/traits/mod.rs
new file mode 100644
index 0000000..c5923b9
--- /dev/null
+++ b/crates/dirigent_core/src/traits/mod.rs
@@ -0,0 +1,5 @@
+mod inspector;
+mod process;
+
+pub use inspector::ConnectorInspector;
+pub use process::{graceful_shutdown_async, ProcessGroupManager, ProcessLifecycle};
diff --git a/crates/dirigent_core/src/traits/process.rs b/crates/dirigent_core/src/traits/process.rs
new file mode 100644
index 0000000..6138f89
--- /dev/null
+++ b/crates/dirigent_core/src/traits/process.rs
@@ -0,0 +1,62 @@
+use std::io;
+
+/// Factory for creating platform-specific process lifecycle handlers.
+///
+/// Each connector receives a `ProcessGroupManager` at construction time and
+/// calls `create_lifecycle()` to obtain a handler scoped to a single child
+/// process (or process group).
+pub trait ProcessGroupManager: Send + Sync {
+ /// Create a fresh lifecycle handler for a new child process.
+ fn create_lifecycle(&self) -> Box;
+}
+
+/// Platform-specific process lifecycle operations.
+///
+/// Implementations handle the OS-level details of job objects (Windows),
+/// process groups (Unix), and signal delivery so that connector code
+/// remains platform-agnostic.
+pub trait ProcessLifecycle: Send + Sync {
+ /// Configure a `tokio::process::Command` before spawning
+ /// (e.g., assign to a job object or set `setsid`).
+ fn configure_async_command(&self, cmd: &mut tokio::process::Command);
+
+ /// Register a spawned child by PID for later signal delivery.
+ fn register_child(&self, pid: u32) -> Result<(), io::Error>;
+
+ /// Send a graceful shutdown signal (SIGTERM on Unix, CTRL_BREAK on Windows).
+ fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error>;
+
+ /// Force-kill the process (SIGKILL on Unix, TerminateProcess on Windows).
+ fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error>;
+}
+
+/// Attempt a graceful shutdown of `child`, falling back to a forced kill
+/// after `timeout` elapses.
+///
+/// Returns `true` if the process exited within the timeout (or was already
+/// gone), `false` if a forced kill was required.
+pub async fn graceful_shutdown_async(
+ lifecycle: &dyn ProcessLifecycle,
+ child: &mut tokio::process::Child,
+ timeout: std::time::Duration,
+) -> bool {
+ let pid = match child.id() {
+ Some(0) | None => return true,
+ Some(pid) => pid,
+ };
+
+ if lifecycle.send_shutdown_signal(pid).is_err() {
+ return true;
+ }
+
+ match tokio::time::timeout(timeout, child.wait()).await {
+ Ok(Ok(_)) => true,
+ Ok(Err(_)) => true,
+ Err(_) => {
+ tracing::debug!(pid, "Graceful shutdown timed out, force killing");
+ let _ = lifecycle.send_kill_signal(pid);
+ let _ = child.wait().await;
+ false
+ }
+ }
+}
diff --git a/crates/dirigent_inspector/Cargo.toml b/crates/dirigent_inspector/Cargo.toml
deleted file mode 100644
index a9fbc13..0000000
--- a/crates/dirigent_inspector/Cargo.toml
+++ /dev/null
@@ -1,36 +0,0 @@
-[package]
-name = "dirigent_inspector"
-version = "0.1.0"
-edition = "2021"
-
-[lib]
-path = "src/lib.rs"
-
-[dependencies]
-# Async traits
-async-trait = "0.1"
-
-# Date/time handling
-chrono = { version = "0.4", features = ["serde"] }
-
-# Protocol types (canonical node types)
-dirigent_protocol = { path = "../dirigent_protocol" }
-
-# Serialization
-serde = { version = "1.0", features = ["derive"] }
-serde_json = "1.0"
-
-# Cross-platform process/system metrics
-sysinfo = "0.33"
-
-# Error handling
-thiserror = "2.0"
-
-# Async runtime
-tokio = { version = "1.42", features = ["sync", "time", "rt", "macros"] }
-
-# Logging
-tracing = "0.1"
-
-[dev-dependencies]
-tokio = { version = "1.42", features = ["full"] }
diff --git a/crates/dirigent_inspector/src/channel.rs b/crates/dirigent_inspector/src/channel.rs
deleted file mode 100644
index 7673122..0000000
--- a/crates/dirigent_inspector/src/channel.rs
+++ /dev/null
@@ -1,349 +0,0 @@
-use crate::error::{InspectorError, Result};
-use serde::{Deserialize, Serialize};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use tokio::sync::{mpsc, oneshot};
-
-/// A command that can be sent to a node.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct NodeCommand {
- /// Unique command ID for correlation with responses.
- pub id: String,
- /// What kind of command this is.
- pub kind: CommandKind,
- /// Arbitrary payload data.
- pub payload: serde_json::Value,
-}
-
-/// The type of command being sent to a node.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum CommandKind {
- /// Request the node to report its internal state (introspective).
- Introspect,
- /// Execute a named operation.
- Execute(String),
- /// Custom extension command.
- Custom(String),
-}
-
-/// Response from a node to a command.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct CommandResponse {
- /// The command ID this is responding to.
- pub command_id: String,
- /// Whether the command was handled successfully.
- pub success: bool,
- /// Response data.
- pub data: serde_json::Value,
-}
-
-impl CommandResponse {
- /// Create a success response.
- pub fn ok(command_id: impl Into, data: serde_json::Value) -> Self {
- Self {
- command_id: command_id.into(),
- success: true,
- data,
- }
- }
-
- /// Create an error response.
- pub fn err(command_id: impl Into, message: impl Into) -> Self {
- Self {
- command_id: command_id.into(),
- success: false,
- data: serde_json::json!({ "error": message.into() }),
- }
- }
-}
-
-type CommandPayload = (NodeCommand, oneshot::Sender);
-
-/// Create a new inspector channel pair.
-///
-/// Returns `(sender, receiver)` where:
-/// - The **sender** is used by callers (UI, API) to send commands to the node.
-/// - The **receiver** is used by the node's loop to receive and respond to commands.
-///
-/// `capacity` controls the bounded channel size.
-pub fn channel(capacity: usize) -> (InspectorChannelSender, InspectorChannelReceiver) {
- let (tx, rx) = mpsc::channel(capacity);
- let pending = Arc::new(AtomicUsize::new(0));
-
- (
- InspectorChannelSender {
- tx,
- pending: Arc::clone(&pending),
- },
- InspectorChannelReceiver { rx, pending },
- )
-}
-
-/// Caller-side of the inspector channel: send commands, check queue depth.
-pub struct InspectorChannelSender {
- tx: mpsc::Sender,
- pending: Arc,
-}
-
-impl InspectorChannelSender {
- /// Send a command and wait for the response.
- pub async fn send(&self, cmd: NodeCommand) -> Result {
- let (resp_tx, resp_rx) = oneshot::channel();
- self.pending.fetch_add(1, Ordering::Relaxed);
-
- self.tx
- .send((cmd, resp_tx))
- .await
- .map_err(|_| InspectorError::ChannelClosed)?;
-
- resp_rx.await.map_err(|_| InspectorError::ChannelClosed)
- }
-
- /// Try to send a command without waiting, returning a receiver for the response.
- ///
- /// This is useful when you want to fire off a command and collect the
- /// response later, or in a `select!` branch.
- pub fn try_send(&self, cmd: NodeCommand) -> Result> {
- let (resp_tx, resp_rx) = oneshot::channel();
- self.pending.fetch_add(1, Ordering::Relaxed);
-
- self.tx.try_send((cmd, resp_tx)).map_err(|e| match e {
- mpsc::error::TrySendError::Full(_) => {
- self.pending.fetch_sub(1, Ordering::Relaxed);
- InspectorError::ChannelFull
- }
- mpsc::error::TrySendError::Closed(_) => {
- self.pending.fetch_sub(1, Ordering::Relaxed);
- InspectorError::ChannelClosed
- }
- })?;
-
- Ok(resp_rx)
- }
-
- /// Number of commands currently in the queue (approximate).
- pub fn pending_count(&self) -> usize {
- self.pending.load(Ordering::Relaxed)
- }
-}
-
-impl Clone for InspectorChannelSender {
- fn clone(&self) -> Self {
- Self {
- tx: self.tx.clone(),
- pending: Arc::clone(&self.pending),
- }
- }
-}
-
-/// Node-side of the inspector channel: receive commands, send responses.
-pub struct InspectorChannelReceiver {
- rx: mpsc::Receiver,
- pending: Arc,
-}
-
-impl InspectorChannelReceiver {
- /// Receive the next command. Returns `None` when all senders are dropped.
- ///
- /// The returned `oneshot::Sender` must be used to send back a response.
- pub async fn recv(&mut self) -> Option<(NodeCommand, oneshot::Sender)> {
- let result = self.rx.recv().await;
- if result.is_some() {
- self.pending.fetch_sub(1, Ordering::Relaxed);
- }
- result
- }
-
- /// Try to receive without blocking. Returns `None` if the channel is empty.
- pub fn try_recv(&mut self) -> Option<(NodeCommand, oneshot::Sender)> {
- match self.rx.try_recv() {
- Ok(payload) => {
- self.pending.fetch_sub(1, Ordering::Relaxed);
- Some(payload)
- }
- Err(_) => None,
- }
- }
-
- /// Number of commands currently in the queue (approximate).
- pub fn pending_count(&self) -> usize {
- self.pending.load(Ordering::Relaxed)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn introspect_cmd(id: &str) -> NodeCommand {
- NodeCommand {
- id: id.to_string(),
- kind: CommandKind::Introspect,
- payload: serde_json::Value::Null,
- }
- }
-
- fn exec_cmd(id: &str, name: &str, payload: serde_json::Value) -> NodeCommand {
- NodeCommand {
- id: id.to_string(),
- kind: CommandKind::Execute(name.to_string()),
- payload,
- }
- }
-
- #[tokio::test]
- async fn test_send_recv_response() {
- let (sender, mut receiver) = channel(10);
-
- // Simulate a node loop in a background task
- let node_task = tokio::spawn(async move {
- if let Some((cmd, resp_tx)) = receiver.recv().await {
- assert_eq!(cmd.id, "cmd-1");
- assert!(matches!(cmd.kind, CommandKind::Introspect));
- let _ = resp_tx.send(CommandResponse::ok(
- &cmd.id,
- serde_json::json!({ "queue_len": 5 }),
- ));
- }
- });
-
- let response = sender.send(introspect_cmd("cmd-1")).await.unwrap();
- assert!(response.success);
- assert_eq!(response.command_id, "cmd-1");
- assert_eq!(response.data["queue_len"], 5);
-
- node_task.await.unwrap();
- }
-
- #[tokio::test]
- async fn test_try_send() {
- let (sender, mut receiver) = channel(10);
-
- let resp_rx = sender
- .try_send(exec_cmd("cmd-2", "restart", serde_json::json!({})))
- .unwrap();
-
- // Node responds
- let (cmd, resp_tx) = receiver.recv().await.unwrap();
- assert_eq!(cmd.id, "cmd-2");
- let _ = resp_tx.send(CommandResponse::ok(&cmd.id, serde_json::json!("restarted")));
-
- let response = resp_rx.await.unwrap();
- assert!(response.success);
- }
-
- #[tokio::test]
- async fn test_pending_count() {
- let (sender, mut receiver) = channel(10);
-
- assert_eq!(sender.pending_count(), 0);
-
- // Send 3 commands without receiving
- let _r1 = sender.try_send(introspect_cmd("a")).unwrap();
- let _r2 = sender.try_send(introspect_cmd("b")).unwrap();
- let _r3 = sender.try_send(introspect_cmd("c")).unwrap();
-
- assert_eq!(sender.pending_count(), 3);
- assert_eq!(receiver.pending_count(), 3);
-
- // Receive one
- let (cmd, resp_tx) = receiver.recv().await.unwrap();
- let _ = resp_tx.send(CommandResponse::ok(&cmd.id, serde_json::Value::Null));
-
- assert_eq!(receiver.pending_count(), 2);
- }
-
- #[tokio::test]
- async fn test_try_send_full() {
- let (sender, _receiver) = channel(1);
-
- // Fill the channel
- let _r1 = sender.try_send(introspect_cmd("a")).unwrap();
-
- // Should fail with ChannelFull
- let result = sender.try_send(introspect_cmd("b"));
- assert!(matches!(result, Err(InspectorError::ChannelFull)));
- }
-
- #[tokio::test]
- async fn test_send_after_receiver_dropped() {
- let (sender, receiver) = channel(10);
- drop(receiver);
-
- let result = sender.send(introspect_cmd("orphan")).await;
- assert!(matches!(result, Err(InspectorError::ChannelClosed)));
- }
-
- #[tokio::test]
- async fn test_recv_after_sender_dropped() {
- let (sender, mut receiver) = channel(10);
- drop(sender);
-
- let result = receiver.recv().await;
- assert!(result.is_none());
- }
-
- #[tokio::test]
- async fn test_try_recv_empty() {
- let (_sender, mut receiver) = channel(10);
- assert!(receiver.try_recv().is_none());
- }
-
- #[tokio::test]
- async fn test_error_response() {
- let (sender, mut receiver) = channel(10);
-
- let node_task = tokio::spawn(async move {
- if let Some((cmd, resp_tx)) = receiver.recv().await {
- let _ = resp_tx.send(CommandResponse::err(&cmd.id, "not supported"));
- }
- });
-
- let response = sender
- .send(exec_cmd("cmd-x", "unknown", serde_json::Value::Null))
- .await
- .unwrap();
- assert!(!response.success);
- assert_eq!(response.data["error"], "not supported");
-
- node_task.await.unwrap();
- }
-
- #[tokio::test]
- async fn test_multiple_senders() {
- let (sender, mut receiver) = channel(10);
- let sender2 = sender.clone();
-
- let _r1 = sender.try_send(introspect_cmd("from-1")).unwrap();
- let _r2 = sender2.try_send(introspect_cmd("from-2")).unwrap();
-
- let (cmd1, resp_tx1) = receiver.recv().await.unwrap();
- let _ = resp_tx1.send(CommandResponse::ok(&cmd1.id, serde_json::Value::Null));
-
- let (cmd2, resp_tx2) = receiver.recv().await.unwrap();
- let _ = resp_tx2.send(CommandResponse::ok(&cmd2.id, serde_json::Value::Null));
-
- // Both commands were received (order may vary since mpsc is FIFO)
- let ids = vec![cmd1.id, cmd2.id];
- assert!(ids.contains(&"from-1".to_string()));
- assert!(ids.contains(&"from-2".to_string()));
- }
-
- #[test]
- fn test_command_serialization() {
- let cmd = exec_cmd("cmd-1", "restart", serde_json::json!({"force": true}));
- let json = serde_json::to_string(&cmd).unwrap();
- let deserialized: NodeCommand = serde_json::from_str(&json).unwrap();
- assert_eq!(deserialized.id, "cmd-1");
- assert!(matches!(deserialized.kind, CommandKind::Execute(ref name) if name == "restart"));
- }
-
- #[test]
- fn test_response_serialization() {
- let resp = CommandResponse::ok("cmd-1", serde_json::json!({"status": "done"}));
- let json = serde_json::to_string(&resp).unwrap();
- let deserialized: CommandResponse = serde_json::from_str(&json).unwrap();
- assert!(deserialized.success);
- assert_eq!(deserialized.data["status"], "done");
- }
-}
diff --git a/crates/dirigent_inspector/src/error.rs b/crates/dirigent_inspector/src/error.rs
deleted file mode 100644
index 9ec227c..0000000
--- a/crates/dirigent_inspector/src/error.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use crate::node::NodeId;
-
-/// Errors that can occur in the inspector system.
-#[derive(Debug, thiserror::Error)]
-pub enum InspectorError {
- #[error("node not found: {0}")]
- NodeNotFound(NodeId),
-
- #[error("node already exists: {0}")]
- NodeAlreadyExists(NodeId),
-
- #[error("parent node not found: {0}")]
- ParentNotFound(NodeId),
-
- #[error("cannot remove root node")]
- CannotRemoveRoot,
-
- #[error("channel closed")]
- ChannelClosed,
-
- #[error("channel full")]
- ChannelFull,
-
- #[error("command timed out")]
- CommandTimeout,
-
- #[error("process not found: pid {0}")]
- ProcessNotFound(u32),
-
- #[error("{0}")]
- Internal(String),
-}
-
-pub type Result = std::result::Result;
diff --git a/crates/dirigent_inspector/src/handle.rs b/crates/dirigent_inspector/src/handle.rs
deleted file mode 100644
index e00ff76..0000000
--- a/crates/dirigent_inspector/src/handle.rs
+++ /dev/null
@@ -1,262 +0,0 @@
-use crate::error::Result;
-use crate::node::{Inspectable, NodeId, NodeMetadata, NodeState};
-use crate::registry::InspectorRegistry;
-use std::collections::HashMap;
-use std::sync::Arc;
-
-/// Handle to a registered node in the inspector tree.
-///
-/// Returned when a node is registered. The producer (connector, service, etc.)
-/// uses this handle to update their node's state and properties without needing
-/// direct access to the registry.
-///
-/// When the handle is dropped, the node is automatically deregistered (best-effort).
-/// To keep the node alive without the handle, call `detach()` before dropping.
-pub struct NodeHandle {
- id: NodeId,
- registry: Arc,
- detached: bool,
-}
-
-impl NodeHandle {
- pub(crate) fn new(id: NodeId, registry: Arc) -> Self {
- Self {
- id,
- registry,
- detached: false,
- }
- }
-
- /// Get this node's ID.
- pub fn id(&self) -> &NodeId {
- &self.id
- }
-
- /// Update this node's lifecycle state.
- pub async fn set_state(&self, state: NodeState) -> Result<()> {
- self.registry.update_state(&self.id, state).await
- }
-
- /// Set a single property on this node.
- pub async fn set_property(&self, key: &str, value: serde_json::Value) -> Result<()> {
- let mut props = HashMap::new();
- props.insert(key.to_string(), value);
- self.registry.update_properties(&self.id, props).await
- }
-
- /// Set multiple properties on this node.
- pub async fn set_properties(&self, props: HashMap) -> Result<()> {
- self.registry.update_properties(&self.id, props).await
- }
-
- /// Register a child node under this node.
- ///
- /// Returns a new `NodeHandle` for the child.
- pub async fn register_child(
- &self,
- child_id: NodeId,
- metadata: NodeMetadata,
- inspectable: Option>,
- ) -> Result {
- self.registry
- .register(child_id, &self.id, metadata, inspectable)
- .await
- }
-
- /// Explicitly deregister this node and consume the handle.
- pub async fn deregister(mut self) -> Result<()> {
- self.detached = true; // prevent Drop from double-deregistering
- self.registry.deregister(&self.id).await
- }
-
- /// Detach this handle so the node survives when the handle is dropped.
- ///
- /// After calling this, dropping the handle will NOT deregister the node.
- /// The node can still be removed via `InspectorRegistry::deregister()`.
- pub fn detach(&mut self) {
- self.detached = true;
- }
-
- /// Get a reference to the registry this handle is connected to.
- pub fn registry(&self) -> &Arc {
- &self.registry
- }
-}
-
-impl Drop for NodeHandle {
- fn drop(&mut self) {
- if !self.detached {
- let id = self.id.clone();
- let registry = Arc::clone(&self.registry);
- // Best-effort async deregister from a sync Drop context
- tokio::spawn(async move {
- let _ = registry.deregister(&id).await;
- });
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::node::{NodeKind, NodeMetadata, NodeState};
-
- #[tokio::test]
- async fn test_handle_set_state() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let handle = registry
- .register(
- NodeId::new("dirigent/test"),
- &root,
- NodeMetadata::new(NodeKind::Service, "Test"),
- None,
- )
- .await
- .unwrap();
-
- handle.set_state(NodeState::Running).await.unwrap();
-
- let meta = registry
- .get_node(&NodeId::new("dirigent/test"))
- .await
- .unwrap();
- assert_eq!(meta.state, NodeState::Running);
- }
-
- #[tokio::test]
- async fn test_handle_set_property() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let handle = registry
- .register(
- NodeId::new("dirigent/proc"),
- &root,
- NodeMetadata::new(NodeKind::Process, "Proc"),
- None,
- )
- .await
- .unwrap();
-
- handle
- .set_property("pid", serde_json::json!(9999))
- .await
- .unwrap();
-
- let meta = registry
- .get_node(&NodeId::new("dirigent/proc"))
- .await
- .unwrap();
- assert_eq!(meta.properties["pid"], serde_json::json!(9999));
- }
-
- #[tokio::test]
- async fn test_handle_register_child() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let parent_handle = registry
- .register(
- NodeId::new("dirigent/parent"),
- &root,
- NodeMetadata::new(NodeKind::Connector, "Parent"),
- None,
- )
- .await
- .unwrap();
-
- let child_handle = parent_handle
- .register_child(
- NodeId::new("dirigent/parent/child"),
- NodeMetadata::new(NodeKind::Process, "Child"),
- None,
- )
- .await
- .unwrap();
-
- assert_eq!(child_handle.id().as_str(), "dirigent/parent/child");
- assert!(
- registry
- .contains(&NodeId::new("dirigent/parent/child"))
- .await
- );
- }
-
- #[tokio::test]
- async fn test_handle_deregister() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let handle = registry
- .register(
- NodeId::new("dirigent/temp"),
- &root,
- NodeMetadata::new(NodeKind::AsyncTask, "Temp"),
- None,
- )
- .await
- .unwrap();
-
- assert!(registry.contains(&NodeId::new("dirigent/temp")).await);
-
- handle.deregister().await.unwrap();
-
- assert!(!registry.contains(&NodeId::new("dirigent/temp")).await);
- }
-
- #[tokio::test]
- async fn test_handle_drop_deregisters() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- {
- let _handle = registry
- .register(
- NodeId::new("dirigent/ephemeral"),
- &root,
- NodeMetadata::new(NodeKind::AsyncTask, "Ephemeral"),
- None,
- )
- .await
- .unwrap();
- // handle dropped here
- }
-
- // Give the spawned deregister task a moment to run
- tokio::time::sleep(std::time::Duration::from_millis(50)).await;
-
- assert!(
- !registry.contains(&NodeId::new("dirigent/ephemeral")).await,
- "Node should be deregistered after handle drop"
- );
- }
-
- #[tokio::test]
- async fn test_handle_detach_survives_drop() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- {
- let mut handle = registry
- .register(
- NodeId::new("dirigent/persistent"),
- &root,
- NodeMetadata::new(NodeKind::Service, "Persistent"),
- None,
- )
- .await
- .unwrap();
- handle.detach();
- // handle dropped here, but detached
- }
-
- tokio::time::sleep(std::time::Duration::from_millis(50)).await;
-
- assert!(
- registry.contains(&NodeId::new("dirigent/persistent")).await,
- "Detached node should survive handle drop"
- );
- }
-}
diff --git a/crates/dirigent_inspector/src/lib.rs b/crates/dirigent_inspector/src/lib.rs
deleted file mode 100644
index e3b11b0..0000000
--- a/crates/dirigent_inspector/src/lib.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-pub mod channel;
-pub mod error;
-pub mod handle;
-pub mod node;
-pub mod process;
-pub mod registry;
-pub mod snapshot;
-pub mod system;
-pub mod tree;
-
-// Re-export commonly used types
-pub use channel::{
- channel as inspector_channel, CommandKind, CommandResponse, InspectorChannelReceiver,
- InspectorChannelSender, NodeCommand,
-};
-pub use error::{InspectorError, Result};
-pub use handle::NodeHandle;
-pub use node::{Inspectable, NodeId, NodeKind, NodeMetadata, NodeState};
-pub use process::{ProcessInfo, ProcessMonitor, ProcessStatus};
-pub use registry::{InspectorEvent, InspectorRegistry};
-pub use snapshot::{NodeSnapshot, TreeSnapshot};
-pub use system::{SystemInfo, SystemMonitor};
-pub use tree::{NodeTree, TreeNode};
diff --git a/crates/dirigent_inspector/src/node.rs b/crates/dirigent_inspector/src/node.rs
deleted file mode 100644
index 519f89d..0000000
--- a/crates/dirigent_inspector/src/node.rs
+++ /dev/null
@@ -1,109 +0,0 @@
-use async_trait::async_trait;
-
-// Re-export canonical types from dirigent_protocol
-pub use dirigent_protocol::inspector::{NodeId, NodeKind, NodeMetadata, NodeState};
-
-/// Trait for components that support rich introspection.
-///
-/// Implementing this trait allows a component to provide detailed status reports
-/// when queried. The `inspect()` method runs in the component's own async context
-/// (introspective), while `current_state()` is synchronous for quick polling.
-#[async_trait]
-pub trait Inspectable: Send + Sync {
- /// Return a detailed status report as JSON.
- ///
- /// This is an introspective operation: it runs inside the node's context
- /// and can access internal state that isn't part of the standard metadata.
- async fn inspect(&self) -> serde_json::Value;
-
- /// Return the current lifecycle state.
- ///
- /// This should be a cheap, synchronous operation returning the node's
- /// current state without blocking.
- fn current_state(&self) -> NodeState;
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_node_id_child() {
- let root = NodeId::new("dirigent");
- let child = root.child("connectors");
- assert_eq!(child.as_str(), "dirigent/connectors");
-
- let grandchild = child.child("acp-claude");
- assert_eq!(grandchild.as_str(), "dirigent/connectors/acp-claude");
- }
-
- #[test]
- fn test_node_id_parent() {
- let id = NodeId::new("dirigent/connectors/acp-claude");
- assert_eq!(id.parent().unwrap().as_str(), "dirigent/connectors");
- assert_eq!(
- id.parent().unwrap().parent().unwrap().as_str(),
- "dirigent"
- );
- assert!(NodeId::new("dirigent").parent().is_none());
- }
-
- #[test]
- fn test_node_id_name() {
- assert_eq!(NodeId::new("dirigent/connectors/acp").name(), "acp");
- assert_eq!(NodeId::new("dirigent").name(), "dirigent");
- }
-
- #[test]
- fn test_node_id_from_str() {
- let id: NodeId = "dirigent/system/host".into();
- assert_eq!(id.as_str(), "dirigent/system/host");
- }
-
- #[test]
- fn test_node_metadata_builder() {
- let meta = NodeMetadata::new(NodeKind::Connector, "My Connector")
- .with_state(NodeState::Running)
- .with_property("base_url", serde_json::json!("http://localhost:3000"));
-
- assert_eq!(meta.kind, NodeKind::Connector);
- assert_eq!(meta.label, "My Connector");
- assert_eq!(meta.state, NodeState::Running);
- assert_eq!(
- meta.properties.get("base_url").unwrap(),
- &serde_json::json!("http://localhost:3000")
- );
- }
-
- #[test]
- fn test_node_metadata_serialization() {
- let meta = NodeMetadata::new(NodeKind::Process, "stdio-transport")
- .with_state(NodeState::Busy("processing message".into()))
- .with_property("pid", serde_json::json!(12345));
-
- let json = serde_json::to_string(&meta).unwrap();
- let deserialized: NodeMetadata = serde_json::from_str(&json).unwrap();
-
- assert_eq!(deserialized.kind, NodeKind::Process);
- assert_eq!(deserialized.label, "stdio-transport");
- assert_eq!(
- deserialized.state,
- NodeState::Busy("processing message".into())
- );
- }
-
- #[test]
- fn test_node_state_display() {
- assert_eq!(NodeState::Running.to_string(), "Running");
- assert_eq!(
- NodeState::Error("timeout".into()).to_string(),
- "Error(timeout)"
- );
- }
-
- #[test]
- fn test_node_kind_display() {
- assert_eq!(NodeKind::Root.to_string(), "Root");
- assert_eq!(NodeKind::Custom("agent".into()).to_string(), "Custom(agent)");
- }
-}
diff --git a/crates/dirigent_inspector/src/process.rs b/crates/dirigent_inspector/src/process.rs
deleted file mode 100644
index d85f11c..0000000
--- a/crates/dirigent_inspector/src/process.rs
+++ /dev/null
@@ -1,381 +0,0 @@
-use crate::node::NodeId;
-use crate::registry::InspectorRegistry;
-use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::sync::Arc;
-use std::time::Duration;
-use sysinfo::{Pid, ProcessesToUpdate, System};
-use tokio::task::JoinHandle;
-use tracing::{debug, warn};
-
-/// Information about a monitored OS process.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct ProcessInfo {
- pub pid: u32,
- pub name: String,
- pub command: Vec,
- pub exe: Option,
- pub cwd: Option,
- pub status: ProcessStatus,
- pub cpu_usage_percent: f32,
- pub memory_bytes: u64,
- pub virtual_memory_bytes: u64,
- pub start_time_secs: u64,
- pub run_time_secs: u64,
- pub disk_read_bytes: u64,
- pub disk_written_bytes: u64,
-}
-
-/// Simplified cross-platform process status.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub enum ProcessStatus {
- Running,
- Sleeping,
- Stopped,
- Zombie,
- Dead,
- Unknown,
-}
-
-impl From for ProcessStatus {
- fn from(s: sysinfo::ProcessStatus) -> Self {
- match s {
- sysinfo::ProcessStatus::Run => ProcessStatus::Running,
- sysinfo::ProcessStatus::Sleep | sysinfo::ProcessStatus::Idle => ProcessStatus::Sleeping,
- sysinfo::ProcessStatus::Stop | sysinfo::ProcessStatus::Tracing => {
- ProcessStatus::Stopped
- }
- sysinfo::ProcessStatus::Zombie => ProcessStatus::Zombie,
- sysinfo::ProcessStatus::Dead | sysinfo::ProcessStatus::UninterruptibleDiskSleep => {
- ProcessStatus::Dead
- }
- _ => ProcessStatus::Unknown,
- }
- }
-}
-
-/// Direction of the last observed I/O activity.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub enum IoDirection {
- Read,
- Write,
-}
-
-/// Information about the most recent I/O activity.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct IoActivity {
- pub direction: IoDirection,
- pub timestamp: chrono::DateTime,
-}
-
-/// Monitors a set of OS processes by PID, providing metrics via `sysinfo`.
-///
-/// Each tracked PID is associated with a `NodeId` in the inspector tree.
-/// The monitor can be polled manually or run as a background task that
-/// periodically refreshes and updates the registry.
-pub struct ProcessMonitor {
- system: System,
- tracked: HashMap,
-}
-
-impl ProcessMonitor {
- /// Create a new process monitor.
- pub fn new() -> Self {
- Self {
- system: System::new(),
- tracked: HashMap::new(),
- }
- }
-
- /// Start tracking a process by PID, associated with the given node ID.
- pub fn track(&mut self, pid: u32, node_id: NodeId) {
- debug!(pid, node_id = %node_id, "Tracking process");
- self.tracked.insert(pid, node_id);
- }
-
- /// Stop tracking a process.
- pub fn untrack(&mut self, pid: u32) {
- debug!(pid, "Untracking process");
- self.tracked.remove(&pid);
- }
-
- /// Get the set of tracked PIDs and their node IDs.
- pub fn tracked_pids(&self) -> &HashMap {
- &self.tracked
- }
-
- /// Refresh data for all tracked processes and return their info.
- pub fn refresh(&mut self) -> HashMap {
- // Build list of PIDs to refresh
- let pids: Vec = self.tracked.keys().map(|&p| Pid::from_u32(p)).collect();
-
- // Refresh only tracked processes
- self.system
- .refresh_processes(ProcessesToUpdate::Some(&pids), true);
-
- let mut results = HashMap::new();
-
- for (&pid, _node_id) in &self.tracked {
- let sysinfo_pid = Pid::from_u32(pid);
- if let Some(process) = self.system.process(sysinfo_pid) {
- let disk_usage = process.disk_usage();
- let info = ProcessInfo {
- pid,
- name: process.name().to_string_lossy().to_string(),
- command: process
- .cmd()
- .iter()
- .map(|s| s.to_string_lossy().to_string())
- .collect(),
- exe: process.exe().map(|p| p.to_path_buf()),
- cwd: process.cwd().map(|p| p.to_path_buf()),
- status: ProcessStatus::from(process.status()),
- cpu_usage_percent: process.cpu_usage(),
- memory_bytes: process.memory(),
- virtual_memory_bytes: process.virtual_memory(),
- start_time_secs: process.start_time(),
- run_time_secs: process.run_time(),
- disk_read_bytes: disk_usage.read_bytes,
- disk_written_bytes: disk_usage.written_bytes,
- };
- results.insert(pid, info);
- }
- }
-
- results
- }
-
- /// Get info for a single tracked process.
- pub fn get(&mut self, pid: u32) -> Option {
- let sysinfo_pid = Pid::from_u32(pid);
- self.system
- .refresh_processes(ProcessesToUpdate::Some(&[sysinfo_pid]), true);
-
- self.system.process(sysinfo_pid).map(|process| {
- let disk_usage = process.disk_usage();
- ProcessInfo {
- pid,
- name: process.name().to_string_lossy().to_string(),
- command: process
- .cmd()
- .iter()
- .map(|s| s.to_string_lossy().to_string())
- .collect(),
- exe: process.exe().map(|p| p.to_path_buf()),
- cwd: process.cwd().map(|p| p.to_path_buf()),
- status: ProcessStatus::from(process.status()),
- cpu_usage_percent: process.cpu_usage(),
- memory_bytes: process.memory(),
- virtual_memory_bytes: process.virtual_memory(),
- start_time_secs: process.start_time(),
- run_time_secs: process.run_time(),
- disk_read_bytes: disk_usage.read_bytes,
- disk_written_bytes: disk_usage.written_bytes,
- }
- })
- }
-
- /// Check if a process is alive (outrospective: queries the OS directly).
- pub fn is_alive(&mut self, pid: u32) -> bool {
- let sysinfo_pid = Pid::from_u32(pid);
- self.system
- .refresh_processes(ProcessesToUpdate::Some(&[sysinfo_pid]), true);
- self.system.process(sysinfo_pid).is_some()
- }
-
- /// Spawn a background task that periodically refreshes tracked processes
- /// and updates their nodes in the registry.
- ///
- /// The task runs until the returned `JoinHandle` is aborted or the
- /// monitor is dropped.
- pub fn start_polling(
- mut self,
- registry: Arc,
- interval: Duration,
- ) -> JoinHandle<()> {
- tokio::spawn(async move {
- let mut ticker = tokio::time::interval(interval);
- loop {
- ticker.tick().await;
-
- let infos = self.refresh();
-
- for (pid, info) in &infos {
- if let Some(node_id) = self.tracked.get(pid) {
- let mut props = HashMap::new();
- props.insert("pid".to_string(), serde_json::json!(info.pid));
- props.insert("name".to_string(), serde_json::json!(info.name));
- props.insert("command".to_string(), serde_json::json!(info.command));
- props.insert(
- "status".to_string(),
- serde_json::to_value(&info.status).unwrap_or_default(),
- );
- props.insert(
- "cpu_usage_percent".to_string(),
- serde_json::json!(info.cpu_usage_percent),
- );
- props.insert(
- "memory_bytes".to_string(),
- serde_json::json!(info.memory_bytes),
- );
- props.insert(
- "virtual_memory_bytes".to_string(),
- serde_json::json!(info.virtual_memory_bytes),
- );
- props.insert(
- "run_time_secs".to_string(),
- serde_json::json!(info.run_time_secs),
- );
- props.insert(
- "disk_read_bytes".to_string(),
- serde_json::json!(info.disk_read_bytes),
- );
- props.insert(
- "disk_written_bytes".to_string(),
- serde_json::json!(info.disk_written_bytes),
- );
-
- if let Err(e) = registry.update_properties(node_id, props).await {
- warn!(
- pid,
- node_id = %node_id,
- error = %e,
- "Failed to update process node properties"
- );
- }
- }
- }
-
- // Check for dead processes
- let dead_pids: Vec = self
- .tracked
- .keys()
- .filter(|&&pid| !infos.contains_key(&pid))
- .copied()
- .collect();
-
- for pid in dead_pids {
- if let Some(node_id) = self.tracked.get(&pid) {
- debug!(pid, node_id = %node_id, "Process no longer alive");
- let _ = registry
- .update_state(node_id, crate::node::NodeState::Stopped)
- .await;
- }
- }
- }
- })
- }
-}
-
-impl Default for ProcessMonitor {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_process_status_from_sysinfo() {
- assert_eq!(
- ProcessStatus::from(sysinfo::ProcessStatus::Run),
- ProcessStatus::Running
- );
- assert_eq!(
- ProcessStatus::from(sysinfo::ProcessStatus::Sleep),
- ProcessStatus::Sleeping
- );
- assert_eq!(
- ProcessStatus::from(sysinfo::ProcessStatus::Zombie),
- ProcessStatus::Zombie
- );
- assert_eq!(
- ProcessStatus::from(sysinfo::ProcessStatus::Stop),
- ProcessStatus::Stopped
- );
- }
-
- #[test]
- fn test_process_monitor_track_untrack() {
- let mut monitor = ProcessMonitor::new();
- let node_id = NodeId::new("dirigent/test/proc");
-
- monitor.track(1234, node_id.clone());
- assert!(monitor.tracked_pids().contains_key(&1234));
-
- monitor.untrack(1234);
- assert!(!monitor.tracked_pids().contains_key(&1234));
- }
-
- #[test]
- fn test_process_monitor_refresh_current_process() {
- let mut monitor = ProcessMonitor::new();
- let current_pid = std::process::id();
- let node_id = NodeId::new("dirigent/test/self");
-
- monitor.track(current_pid, node_id);
-
- let infos = monitor.refresh();
- assert!(
- infos.contains_key(¤t_pid),
- "Current process should be found"
- );
-
- let info = &infos[¤t_pid];
- assert_eq!(info.pid, current_pid);
- assert!(!info.name.is_empty());
- assert!(info.memory_bytes > 0);
- }
-
- #[test]
- fn test_process_monitor_get_current_process() {
- let mut monitor = ProcessMonitor::new();
- let current_pid = std::process::id();
-
- let info = monitor.get(current_pid);
- assert!(info.is_some(), "Should find current process");
-
- let info = info.unwrap();
- assert_eq!(info.pid, current_pid);
- }
-
- #[test]
- fn test_process_monitor_is_alive() {
- let mut monitor = ProcessMonitor::new();
-
- // Current process should be alive
- assert!(monitor.is_alive(std::process::id()));
-
- // A very high PID should not exist
- assert!(!monitor.is_alive(u32::MAX));
- }
-
- #[test]
- fn test_process_info_serialization() {
- let info = ProcessInfo {
- pid: 1234,
- name: "test".to_string(),
- command: vec!["test".to_string(), "--flag".to_string()],
- exe: Some(PathBuf::from("/usr/bin/test")),
- cwd: Some(PathBuf::from("/home/user")),
- status: ProcessStatus::Running,
- cpu_usage_percent: 12.5,
- memory_bytes: 1024 * 1024,
- virtual_memory_bytes: 2048 * 1024,
- start_time_secs: 1700000000,
- run_time_secs: 3600,
- disk_read_bytes: 1024,
- disk_written_bytes: 2048,
- };
-
- let json = serde_json::to_string(&info).unwrap();
- let deserialized: ProcessInfo = serde_json::from_str(&json).unwrap();
-
- assert_eq!(deserialized.pid, 1234);
- assert_eq!(deserialized.status, ProcessStatus::Running);
- assert_eq!(deserialized.memory_bytes, 1024 * 1024);
- }
-}
diff --git a/crates/dirigent_inspector/src/registry.rs b/crates/dirigent_inspector/src/registry.rs
deleted file mode 100644
index bc96f73..0000000
--- a/crates/dirigent_inspector/src/registry.rs
+++ /dev/null
@@ -1,459 +0,0 @@
-use crate::error::Result;
-use crate::node::{Inspectable, NodeId, NodeKind, NodeMetadata, NodeState};
-use crate::tree::NodeTree;
-use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
-use std::sync::Arc;
-use tokio::sync::{broadcast, RwLock};
-
-/// Events emitted by the registry when the tree changes.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum InspectorEvent {
- NodeRegistered {
- id: NodeId,
- parent: NodeId,
- kind: NodeKind,
- },
- NodeRemoved {
- id: NodeId,
- },
- StateChanged {
- id: NodeId,
- old: NodeState,
- new: NodeState,
- },
- PropertiesUpdated {
- id: NodeId,
- keys: Vec,
- },
-}
-
-/// Central registry for the inspector tree.
-///
-/// Thread-safe: all operations acquire the internal `RwLock` as needed.
-/// Emits `InspectorEvent`s on a broadcast channel for reactive consumers.
-pub struct InspectorRegistry {
- tree: Arc>,
- event_tx: broadcast::Sender,
-}
-
-impl InspectorRegistry {
- /// Create a new registry with a root node.
- ///
- /// The root node ID is `"dirigent"` by default, with `NodeKind::Root`.
- pub fn new() -> Self {
- let root_id = NodeId::new("dirigent");
- let root_meta =
- NodeMetadata::new(NodeKind::Root, "Dirigent").with_state(NodeState::Running);
- let tree = NodeTree::new(root_id, root_meta);
- let (event_tx, _) = broadcast::channel(500);
-
- Self {
- tree: Arc::new(RwLock::new(tree)),
- event_tx,
- }
- }
-
- /// Create a registry with a custom root node.
- pub fn with_root(root_id: NodeId, root_metadata: NodeMetadata) -> Self {
- let tree = NodeTree::new(root_id, root_metadata);
- let (event_tx, _) = broadcast::channel(500);
-
- Self {
- tree: Arc::new(RwLock::new(tree)),
- event_tx,
- }
- }
-
- /// Get the root node ID.
- pub async fn root_id(&self) -> NodeId {
- let tree = self.tree.read().await;
- tree.root_id().clone()
- }
-
- /// Register a new node under the given parent.
- ///
- /// Returns a `NodeHandle` that the producer can use to update this node.
- pub async fn register(
- self: &Arc,
- id: NodeId,
- parent: &NodeId,
- metadata: NodeMetadata,
- inspectable: Option>,
- ) -> Result {
- let kind = metadata.kind.clone();
- let parent_clone = parent.clone();
-
- {
- let mut tree = self.tree.write().await;
- tree.insert(id.clone(), parent, metadata, inspectable)?;
- }
-
- let _ = self.event_tx.send(InspectorEvent::NodeRegistered {
- id: id.clone(),
- parent: parent_clone,
- kind,
- });
-
- Ok(crate::handle::NodeHandle::new(id, Arc::clone(self)))
- }
-
- /// Deregister a node (and reparent its children to its parent).
- pub async fn deregister(&self, id: &NodeId) -> Result<()> {
- {
- let mut tree = self.tree.write().await;
- tree.remove(id)?;
- }
-
- let _ = self
- .event_tx
- .send(InspectorEvent::NodeRemoved { id: id.clone() });
- Ok(())
- }
-
- /// Deregister a node and all its descendants.
- pub async fn deregister_subtree(&self, id: &NodeId) -> Result<()> {
- {
- let mut tree = self.tree.write().await;
- tree.remove_subtree(id)?;
- }
-
- let _ = self
- .event_tx
- .send(InspectorEvent::NodeRemoved { id: id.clone() });
- Ok(())
- }
-
- /// Update a node's lifecycle state.
- pub async fn update_state(&self, id: &NodeId, state: NodeState) -> Result<()> {
- let old = {
- let mut tree = self.tree.write().await;
- tree.update_state(id, state.clone())?
- };
-
- if old != state {
- let _ = self.event_tx.send(InspectorEvent::StateChanged {
- id: id.clone(),
- old,
- new: state,
- });
- }
-
- Ok(())
- }
-
- /// Update or insert properties on a node.
- pub async fn update_properties(
- &self,
- id: &NodeId,
- props: HashMap,
- ) -> Result<()> {
- let keys = {
- let mut tree = self.tree.write().await;
- tree.update_properties(id, props)?
- };
-
- if !keys.is_empty() {
- let _ = self.event_tx.send(InspectorEvent::PropertiesUpdated {
- id: id.clone(),
- keys,
- });
- }
-
- Ok(())
- }
-
- /// Get a clone of a node's metadata.
- pub async fn get_node(&self, id: &NodeId) -> Option {
- let tree = self.tree.read().await;
- tree.get(id).map(|n| n.metadata.clone())
- }
-
- /// Get metadata for all direct children of a node.
- pub async fn get_children(&self, id: &NodeId) -> Vec<(NodeId, NodeMetadata)> {
- let tree = self.tree.read().await;
- tree.children(id)
- .into_iter()
- .map(|n| (n.id.clone(), n.metadata.clone()))
- .collect()
- }
-
- /// Check if a node exists.
- pub async fn contains(&self, id: &NodeId) -> bool {
- let tree = self.tree.read().await;
- tree.contains(id)
- }
-
- /// Get the total number of nodes.
- pub async fn node_count(&self) -> usize {
- let tree = self.tree.read().await;
- tree.len()
- }
-
- /// Call `Inspectable::inspect()` on a node, if it implements the trait.
- ///
- /// Returns `None` if the node doesn't exist or doesn't have an `Inspectable` impl.
- pub async fn inspect_node(&self, id: &NodeId) -> Option {
- let inspectable = {
- let tree = self.tree.read().await;
- tree.get(id)
- .and_then(|n| n.inspectable.as_ref().map(Arc::clone))
- };
-
- match inspectable {
- Some(i) => Some(i.inspect().await),
- None => None,
- }
- }
-
- /// Subscribe to tree change events.
- pub fn subscribe(&self) -> broadcast::Receiver {
- self.event_tx.subscribe()
- }
-
- /// Get a snapshot of the entire tree (see snapshot module).
- pub async fn snapshot(&self) -> crate::snapshot::TreeSnapshot {
- let tree = self.tree.read().await;
- crate::snapshot::TreeSnapshot::from_tree(&tree)
- }
-}
-
-impl Default for InspectorRegistry {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[tokio::test]
- async fn test_registry_new_has_root() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
- assert_eq!(root.as_str(), "dirigent");
- assert!(registry.contains(&root).await);
- assert_eq!(registry.node_count().await, 1);
- }
-
- #[tokio::test]
- async fn test_register_and_get() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let id = NodeId::new("dirigent/connectors");
- let meta = NodeMetadata::new(NodeKind::Connector, "Connectors");
-
- let handle = registry
- .register(id.clone(), &root, meta, None)
- .await
- .unwrap();
- assert_eq!(handle.id().as_str(), "dirigent/connectors");
-
- let retrieved = registry.get_node(&id).await.unwrap();
- assert_eq!(retrieved.label, "Connectors");
- assert_eq!(registry.node_count().await, 2);
- }
-
- #[tokio::test]
- async fn test_register_emits_event() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
- let mut rx = registry.subscribe();
-
- let id = NodeId::new("dirigent/test");
- let meta = NodeMetadata::new(NodeKind::Service, "Test");
-
- let _handle = registry
- .register(id.clone(), &root, meta, None)
- .await
- .unwrap();
-
- let event = rx.recv().await.unwrap();
- match event {
- InspectorEvent::NodeRegistered {
- id: event_id,
- parent,
- kind,
- } => {
- assert_eq!(event_id.as_str(), "dirigent/test");
- assert_eq!(parent.as_str(), "dirigent");
- assert_eq!(kind, NodeKind::Service);
- }
- _ => panic!("Expected NodeRegistered event"),
- }
- }
-
- #[tokio::test]
- async fn test_deregister() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let id = NodeId::new("dirigent/temp");
- let meta = NodeMetadata::new(NodeKind::AsyncTask, "Temp");
- let _handle = registry
- .register(id.clone(), &root, meta, None)
- .await
- .unwrap();
-
- assert!(registry.contains(&id).await);
- registry.deregister(&id).await.unwrap();
- assert!(!registry.contains(&id).await);
- }
-
- #[tokio::test]
- async fn test_update_state_emits_event() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let id = NodeId::new("dirigent/svc");
- let meta = NodeMetadata::new(NodeKind::Service, "Service");
- let _handle = registry
- .register(id.clone(), &root, meta, None)
- .await
- .unwrap();
-
- let mut rx = registry.subscribe();
-
- registry
- .update_state(&id, NodeState::Error("crash".into()))
- .await
- .unwrap();
-
- let event = rx.recv().await.unwrap();
- match event {
- InspectorEvent::StateChanged { id: eid, old, new } => {
- assert_eq!(eid.as_str(), "dirigent/svc");
- assert_eq!(old, NodeState::Initializing);
- assert_eq!(new, NodeState::Error("crash".into()));
- }
- _ => panic!("Expected StateChanged event"),
- }
- }
-
- #[tokio::test]
- async fn test_no_event_on_same_state() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let id = NodeId::new("dirigent/svc");
- let meta = NodeMetadata::new(NodeKind::Service, "Service").with_state(NodeState::Running);
- let _handle = registry
- .register(id.clone(), &root, meta, None)
- .await
- .unwrap();
-
- let mut rx = registry.subscribe();
-
- // Update to the same state
- registry
- .update_state(&id, NodeState::Running)
- .await
- .unwrap();
-
- // Should not receive a StateChanged event
- let result = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
- assert!(result.is_err(), "Should not emit event for same state");
- }
-
- #[tokio::test]
- async fn test_update_properties() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let id = NodeId::new("dirigent/proc");
- let meta = NodeMetadata::new(NodeKind::Process, "Process");
- let _handle = registry
- .register(id.clone(), &root, meta, None)
- .await
- .unwrap();
-
- let mut props = HashMap::new();
- props.insert("pid".to_string(), serde_json::json!(1234));
- props.insert("cpu".to_string(), serde_json::json!(50.0));
-
- registry.update_properties(&id, props).await.unwrap();
-
- let node = registry.get_node(&id).await.unwrap();
- assert_eq!(node.properties["pid"], serde_json::json!(1234));
- assert_eq!(node.properties["cpu"], serde_json::json!(50.0));
- }
-
- #[tokio::test]
- async fn test_get_children() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let _h1 = registry
- .register(
- NodeId::new("dirigent/a"),
- &root,
- NodeMetadata::new(NodeKind::Service, "A"),
- None,
- )
- .await
- .unwrap();
- let _h2 = registry
- .register(
- NodeId::new("dirigent/b"),
- &root,
- NodeMetadata::new(NodeKind::Service, "B"),
- None,
- )
- .await
- .unwrap();
-
- let children = registry.get_children(&root).await;
- assert_eq!(children.len(), 2);
- let labels: Vec<&str> = children.iter().map(|(_, m)| m.label.as_str()).collect();
- assert!(labels.contains(&"A"));
- assert!(labels.contains(&"B"));
- }
-
- #[tokio::test]
- async fn test_inspect_node() {
- use async_trait::async_trait;
-
- struct MockInspectable;
-
- #[async_trait]
- impl Inspectable for MockInspectable {
- async fn inspect(&self) -> serde_json::Value {
- serde_json::json!({ "internal_queue_len": 42 })
- }
- fn current_state(&self) -> NodeState {
- NodeState::Running
- }
- }
-
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let id = NodeId::new("dirigent/inspectable");
- let meta = NodeMetadata::new(NodeKind::Service, "Inspectable Service");
- let _handle = registry
- .register(id.clone(), &root, meta, Some(Arc::new(MockInspectable)))
- .await
- .unwrap();
-
- let result = registry.inspect_node(&id).await;
- assert_eq!(
- result.unwrap(),
- serde_json::json!({ "internal_queue_len": 42 })
- );
-
- // Non-inspectable node returns None
- let id2 = NodeId::new("dirigent/plain");
- let _h2 = registry
- .register(
- id2.clone(),
- &root,
- NodeMetadata::new(NodeKind::Service, "Plain"),
- None,
- )
- .await
- .unwrap();
- assert!(registry.inspect_node(&id2).await.is_none());
- }
-}
diff --git a/crates/dirigent_inspector/src/snapshot.rs b/crates/dirigent_inspector/src/snapshot.rs
deleted file mode 100644
index a2599b7..0000000
--- a/crates/dirigent_inspector/src/snapshot.rs
+++ /dev/null
@@ -1,154 +0,0 @@
-use crate::node::{NodeId, NodeMetadata};
-use crate::tree::NodeTree;
-use serde::{Deserialize, Serialize};
-
-/// A serializable point-in-time capture of the entire inspector tree.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct TreeSnapshot {
- pub timestamp: chrono::DateTime,
- pub nodes: Vec,
-}
-
-/// Snapshot of a single node.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct NodeSnapshot {
- pub id: NodeId,
- pub parent: Option,
- pub children: Vec,
- pub metadata: NodeMetadata,
-}
-
-impl TreeSnapshot {
- /// Create a snapshot from a `NodeTree`.
- pub(crate) fn from_tree(tree: &NodeTree) -> Self {
- let nodes = tree
- .all_nodes()
- .into_iter()
- .map(|n| NodeSnapshot {
- id: n.id.clone(),
- parent: n.parent.clone(),
- children: n.children.clone(),
- metadata: n.metadata.clone(),
- })
- .collect();
-
- Self {
- timestamp: chrono::Utc::now(),
- nodes,
- }
- }
-
- /// Serialize the snapshot to a JSON `Value`.
- pub fn to_json(&self) -> serde_json::Value {
- serde_json::to_value(self).unwrap_or_default()
- }
-
- /// Serialize the snapshot to pretty-printed JSON string.
- pub fn to_json_pretty(&self) -> String {
- serde_json::to_string_pretty(self).unwrap_or_default()
- }
-
- /// Number of nodes in the snapshot.
- pub fn node_count(&self) -> usize {
- self.nodes.len()
- }
-
- /// Find a node by ID.
- pub fn find(&self, id: &NodeId) -> Option<&NodeSnapshot> {
- self.nodes.iter().find(|n| &n.id == id)
- }
-
- /// Get the root node (the one with no parent).
- pub fn root(&self) -> Option<&NodeSnapshot> {
- self.nodes.iter().find(|n| n.parent.is_none())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::node::{NodeKind, NodeMetadata, NodeState};
- use crate::tree::NodeTree;
-
- fn make_tree() -> NodeTree {
- let root = NodeId::new("dirigent");
- let mut tree = NodeTree::new(
- root.clone(),
- NodeMetadata::new(NodeKind::Root, "Dirigent").with_state(NodeState::Running),
- );
- tree.insert(
- NodeId::new("dirigent/svc"),
- &root,
- NodeMetadata::new(NodeKind::Service, "Service A").with_state(NodeState::Running),
- None,
- )
- .unwrap();
- tree.insert(
- NodeId::new("dirigent/svc/task"),
- &NodeId::new("dirigent/svc"),
- NodeMetadata::new(NodeKind::AsyncTask, "Task 1"),
- None,
- )
- .unwrap();
- tree
- }
-
- #[test]
- fn test_snapshot_from_tree() {
- let tree = make_tree();
- let snap = TreeSnapshot::from_tree(&tree);
-
- assert_eq!(snap.node_count(), 3);
- assert!(snap.find(&NodeId::new("dirigent")).is_some());
- assert!(snap.find(&NodeId::new("dirigent/svc")).is_some());
- assert!(snap.find(&NodeId::new("dirigent/svc/task")).is_some());
- }
-
- #[test]
- fn test_snapshot_root() {
- let tree = make_tree();
- let snap = TreeSnapshot::from_tree(&tree);
- let root = snap.root().unwrap();
- assert_eq!(root.id.as_str(), "dirigent");
- assert!(root.parent.is_none());
- }
-
- #[test]
- fn test_snapshot_serialization_roundtrip() {
- let tree = make_tree();
- let snap = TreeSnapshot::from_tree(&tree);
-
- let json = serde_json::to_string(&snap).unwrap();
- let deserialized: TreeSnapshot = serde_json::from_str(&json).unwrap();
-
- assert_eq!(deserialized.node_count(), snap.node_count());
- assert_eq!(
- deserialized
- .find(&NodeId::new("dirigent/svc"))
- .unwrap()
- .metadata
- .label,
- "Service A"
- );
- }
-
- #[test]
- fn test_snapshot_to_json_pretty() {
- let tree = make_tree();
- let snap = TreeSnapshot::from_tree(&tree);
- let pretty = snap.to_json_pretty();
-
- assert!(pretty.contains("dirigent"));
- assert!(pretty.contains("Service A"));
- }
-
- #[test]
- fn test_snapshot_to_json_value() {
- let tree = make_tree();
- let snap = TreeSnapshot::from_tree(&tree);
- let val = snap.to_json();
-
- assert!(val.get("timestamp").is_some());
- assert!(val.get("nodes").unwrap().is_array());
- }
-}
diff --git a/crates/dirigent_inspector/src/system.rs b/crates/dirigent_inspector/src/system.rs
deleted file mode 100644
index 4f71777..0000000
--- a/crates/dirigent_inspector/src/system.rs
+++ /dev/null
@@ -1,231 +0,0 @@
-use crate::node::NodeId;
-use crate::registry::InspectorRegistry;
-use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::Duration;
-use sysinfo::System;
-use tokio::task::JoinHandle;
-use tracing::warn;
-
-/// Host system information.
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct SystemInfo {
- pub hostname: Option,
- pub os_name: Option,
- pub os_version: Option,
- pub kernel_version: Option,
- pub arch: String,
- pub total_memory_bytes: u64,
- pub used_memory_bytes: u64,
- pub available_memory_bytes: u64,
- pub total_swap_bytes: u64,
- pub used_swap_bytes: u64,
- pub cpu_count: usize,
- pub physical_core_count: Option,
- pub global_cpu_usage_percent: f32,
- pub uptime_secs: u64,
-}
-
-/// Monitors host system metrics (memory, CPU, etc.).
-pub struct SystemMonitor {
- system: System,
-}
-
-impl SystemMonitor {
- /// Create a new system monitor.
- pub fn new() -> Self {
- let mut system = System::new();
- // Initial refresh to populate baseline data
- system.refresh_memory();
- system.refresh_cpu_usage();
- Self { system }
- }
-
- /// Refresh and return current system information.
- pub fn refresh(&mut self) -> SystemInfo {
- self.system.refresh_memory();
- self.system.refresh_cpu_usage();
-
- SystemInfo {
- hostname: System::host_name(),
- os_name: System::name(),
- os_version: System::os_version(),
- kernel_version: System::kernel_version(),
- arch: System::cpu_arch(),
- total_memory_bytes: self.system.total_memory(),
- used_memory_bytes: self.system.used_memory(),
- available_memory_bytes: self.system.available_memory(),
- total_swap_bytes: self.system.total_swap(),
- used_swap_bytes: self.system.used_swap(),
- cpu_count: self.system.cpus().len(),
- physical_core_count: self.system.physical_core_count(),
- global_cpu_usage_percent: self.system.global_cpu_usage(),
- uptime_secs: System::uptime(),
- }
- }
-
- /// Spawn a background task that periodically updates a system node in the registry.
- ///
- /// The `node_id` should already be registered in the tree (e.g., "dirigent/system/host").
- pub fn start_polling(
- mut self,
- registry: Arc,
- node_id: NodeId,
- interval: Duration,
- ) -> JoinHandle<()> {
- tokio::spawn(async move {
- let mut ticker = tokio::time::interval(interval);
- loop {
- ticker.tick().await;
-
- let info = self.refresh();
-
- let mut props = HashMap::new();
- props.insert("hostname".to_string(), serde_json::json!(info.hostname));
- props.insert("os_name".to_string(), serde_json::json!(info.os_name));
- props.insert("os_version".to_string(), serde_json::json!(info.os_version));
- props.insert("arch".to_string(), serde_json::json!(info.arch));
- props.insert(
- "total_memory_bytes".to_string(),
- serde_json::json!(info.total_memory_bytes),
- );
- props.insert(
- "used_memory_bytes".to_string(),
- serde_json::json!(info.used_memory_bytes),
- );
- props.insert(
- "available_memory_bytes".to_string(),
- serde_json::json!(info.available_memory_bytes),
- );
- props.insert(
- "total_swap_bytes".to_string(),
- serde_json::json!(info.total_swap_bytes),
- );
- props.insert(
- "used_swap_bytes".to_string(),
- serde_json::json!(info.used_swap_bytes),
- );
- props.insert("cpu_count".to_string(), serde_json::json!(info.cpu_count));
- props.insert(
- "physical_core_count".to_string(),
- serde_json::json!(info.physical_core_count),
- );
- props.insert(
- "global_cpu_usage_percent".to_string(),
- serde_json::json!(info.global_cpu_usage_percent),
- );
- props.insert(
- "uptime_secs".to_string(),
- serde_json::json!(info.uptime_secs),
- );
-
- if let Err(e) = registry.update_properties(&node_id, props).await {
- warn!(
- node_id = %node_id,
- error = %e,
- "Failed to update system node properties"
- );
- }
- }
- })
- }
-}
-
-impl Default for SystemMonitor {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_system_monitor_refresh() {
- let mut monitor = SystemMonitor::new();
-
- // Need a brief sleep for CPU usage sampling
- std::thread::sleep(std::time::Duration::from_millis(200));
-
- let info = monitor.refresh();
-
- assert!(info.total_memory_bytes > 0, "Should have total memory");
- assert!(info.cpu_count > 0, "Should have at least 1 CPU");
- assert!(info.uptime_secs > 0, "Should have uptime");
- assert!(info.arch.len() > 0, "Should have arch info");
- }
-
- #[test]
- fn test_system_info_serialization() {
- let info = SystemInfo {
- hostname: Some("testhost".to_string()),
- os_name: Some("macOS".to_string()),
- os_version: Some("14.0".to_string()),
- kernel_version: Some("23.0.0".to_string()),
- arch: "arm64".to_string(),
- total_memory_bytes: 16 * 1024 * 1024 * 1024,
- used_memory_bytes: 8 * 1024 * 1024 * 1024,
- available_memory_bytes: 8 * 1024 * 1024 * 1024,
- total_swap_bytes: 2 * 1024 * 1024 * 1024,
- used_swap_bytes: 512 * 1024 * 1024,
- cpu_count: 10,
- physical_core_count: Some(10),
- global_cpu_usage_percent: 25.5,
- uptime_secs: 86400,
- };
-
- let json = serde_json::to_string(&info).unwrap();
- let deserialized: SystemInfo = serde_json::from_str(&json).unwrap();
-
- assert_eq!(deserialized.hostname, Some("testhost".to_string()));
- assert_eq!(deserialized.total_memory_bytes, 16 * 1024 * 1024 * 1024);
- assert_eq!(deserialized.cpu_count, 10);
- }
-
- #[tokio::test]
- async fn test_system_monitor_polling() {
- use crate::node::{NodeKind, NodeMetadata, NodeState};
-
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- // Register system node
- let node_id = NodeId::new("dirigent/system/host");
- let mut handle = registry
- .register(
- node_id.clone(),
- &root,
- NodeMetadata::new(NodeKind::System, "Host System").with_state(NodeState::Running),
- None,
- )
- .await
- .unwrap();
- // Detach so polling task can update it
- // (handle would deregister on drop otherwise)
- handle.detach();
-
- let monitor = SystemMonitor::new();
- let task = monitor.start_polling(
- Arc::clone(®istry),
- node_id.clone(),
- Duration::from_millis(100),
- );
-
- // Wait for at least one poll cycle
- tokio::time::sleep(Duration::from_millis(250)).await;
-
- let meta = registry.get_node(&node_id).await.unwrap();
- assert!(
- meta.properties.contains_key("total_memory_bytes"),
- "Should have system metrics after polling"
- );
- assert!(
- meta.properties.contains_key("cpu_count"),
- "Should have CPU count"
- );
-
- task.abort();
- }
-}
diff --git a/crates/dirigent_inspector/src/tree.rs b/crates/dirigent_inspector/src/tree.rs
deleted file mode 100644
index 1ac6c80..0000000
--- a/crates/dirigent_inspector/src/tree.rs
+++ /dev/null
@@ -1,544 +0,0 @@
-use crate::error::{InspectorError, Result};
-use crate::node::{Inspectable, NodeId, NodeMetadata, NodeState};
-use std::collections::HashMap;
-use std::sync::Arc;
-
-/// A node within the inspector tree, holding metadata and relationships.
-pub struct TreeNode {
- pub id: NodeId,
- pub metadata: NodeMetadata,
- pub parent: Option,
- pub children: Vec,
- pub inspectable: Option>,
-}
-
-impl TreeNode {
- fn new(
- id: NodeId,
- metadata: NodeMetadata,
- parent: Option,
- inspectable: Option>,
- ) -> Self {
- Self {
- id,
- metadata,
- parent,
- children: Vec::new(),
- inspectable,
- }
- }
-}
-
-/// The inspector tree: a rooted tree of `TreeNode`s with parent-child relationships.
-///
-/// All mutations go through this struct. Thread safety is provided by wrapping
-/// `NodeTree` in `Arc>` at the registry level.
-pub struct NodeTree {
- nodes: HashMap,
- root: NodeId,
-}
-
-impl NodeTree {
- /// Create a new tree with a root node.
- ///
- /// The root node is always present and cannot be removed.
- pub fn new(root_id: NodeId, root_metadata: NodeMetadata) -> Self {
- let mut nodes = HashMap::new();
- nodes.insert(
- root_id.clone(),
- TreeNode::new(root_id.clone(), root_metadata, None, None),
- );
- Self {
- nodes,
- root: root_id,
- }
- }
-
- /// Get the root node ID.
- pub fn root_id(&self) -> &NodeId {
- &self.root
- }
-
- /// Insert a new node as a child of the given parent.
- pub fn insert(
- &mut self,
- id: NodeId,
- parent: &NodeId,
- metadata: NodeMetadata,
- inspectable: Option>,
- ) -> Result<()> {
- if self.nodes.contains_key(&id) {
- return Err(InspectorError::NodeAlreadyExists(id));
- }
- if !self.nodes.contains_key(parent) {
- return Err(InspectorError::ParentNotFound(parent.clone()));
- }
-
- let node = TreeNode::new(id.clone(), metadata, Some(parent.clone()), inspectable);
- self.nodes.insert(id.clone(), node);
-
- // Add as child of parent
- if let Some(parent_node) = self.nodes.get_mut(parent) {
- parent_node.children.push(id);
- }
-
- Ok(())
- }
-
- /// Remove a node and reparent its children to the node's parent.
- ///
- /// The root node cannot be removed.
- pub fn remove(&mut self, id: &NodeId) -> Result<()> {
- if id == &self.root {
- return Err(InspectorError::CannotRemoveRoot);
- }
-
- let node = self
- .nodes
- .get(id)
- .ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?;
-
- let parent_id = node.parent.clone();
- let children: Vec = node.children.clone();
-
- // Reparent children to the removed node's parent
- if let Some(ref parent_id) = parent_id {
- for child_id in &children {
- if let Some(child) = self.nodes.get_mut(child_id) {
- child.parent = Some(parent_id.clone());
- }
- }
- // Update parent's children: remove this node, add reparented children
- if let Some(parent) = self.nodes.get_mut(parent_id) {
- parent.children.retain(|c| c != id);
- parent.children.extend(children);
- }
- }
-
- self.nodes.remove(id);
- Ok(())
- }
-
- /// Remove a node and all its descendants.
- ///
- /// The root node cannot be removed.
- pub fn remove_subtree(&mut self, id: &NodeId) -> Result<()> {
- if id == &self.root {
- return Err(InspectorError::CannotRemoveRoot);
- }
-
- if !self.nodes.contains_key(id) {
- return Err(InspectorError::NodeNotFound(id.clone()));
- }
-
- // Collect all descendant IDs via BFS
- let mut to_remove = Vec::new();
- let mut queue = vec![id.clone()];
- while let Some(current) = queue.pop() {
- if let Some(node) = self.nodes.get(¤t) {
- queue.extend(node.children.clone());
- }
- to_remove.push(current);
- }
-
- // Remove this node from parent's children list
- let parent_id = self.nodes.get(id).and_then(|n| n.parent.clone());
- if let Some(parent_id) = parent_id {
- if let Some(parent) = self.nodes.get_mut(&parent_id) {
- parent.children.retain(|c| c != id);
- }
- }
-
- // Remove all collected nodes
- for node_id in &to_remove {
- self.nodes.remove(node_id);
- }
-
- Ok(())
- }
-
- /// Get a reference to a node by ID.
- pub fn get(&self, id: &NodeId) -> Option<&TreeNode> {
- self.nodes.get(id)
- }
-
- /// Get a mutable reference to a node by ID.
- pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut TreeNode> {
- self.nodes.get_mut(id)
- }
-
- /// Get the direct children of a node.
- pub fn children(&self, id: &NodeId) -> Vec<&TreeNode> {
- self.nodes
- .get(id)
- .map(|node| {
- node.children
- .iter()
- .filter_map(|child_id| self.nodes.get(child_id))
- .collect()
- })
- .unwrap_or_default()
- }
-
- /// Get all ancestors of a node, from immediate parent to root.
- pub fn ancestors(&self, id: &NodeId) -> Vec<&TreeNode> {
- let mut result = Vec::new();
- let mut current = self.nodes.get(id).and_then(|n| n.parent.as_ref());
- while let Some(parent_id) = current {
- if let Some(parent) = self.nodes.get(parent_id) {
- result.push(parent);
- current = parent.parent.as_ref();
- } else {
- break;
- }
- }
- result
- }
-
- /// Get all nodes in the tree.
- pub fn all_nodes(&self) -> Vec<&TreeNode> {
- self.nodes.values().collect()
- }
-
- /// Total number of nodes in the tree.
- pub fn len(&self) -> usize {
- self.nodes.len()
- }
-
- /// Whether the tree is empty (only root).
- pub fn is_empty(&self) -> bool {
- self.nodes.len() <= 1
- }
-
- /// Update a node's state, returning the old state.
- pub fn update_state(&mut self, id: &NodeId, state: NodeState) -> Result {
- let node = self
- .nodes
- .get_mut(id)
- .ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?;
- let old = node.metadata.state.clone();
- node.metadata.state = state;
- node.metadata.last_updated = chrono::Utc::now();
- Ok(old)
- }
-
- /// Update or insert properties on a node. Returns only the keys whose values actually changed.
- ///
- /// A key is considered changed if it is new (not previously present) or if its value
- /// differs from the stored value (compared via `serde_json::Value::PartialEq`).
- /// The `last_updated` timestamp is only bumped when at least one value changed.
- pub fn update_properties(
- &mut self,
- id: &NodeId,
- props: HashMap,
- ) -> Result> {
- let node = self
- .nodes
- .get_mut(id)
- .ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?;
-
- let mut changed_keys = Vec::new();
- for (key, new_value) in props {
- let is_changed = match node.metadata.properties.get(&key) {
- Some(existing) => existing != &new_value,
- None => true, // new key
- };
- if is_changed {
- changed_keys.push(key.clone());
- node.metadata.properties.insert(key, new_value);
- }
- }
-
- if !changed_keys.is_empty() {
- node.metadata.last_updated = chrono::Utc::now();
- }
-
- Ok(changed_keys)
- }
-
- /// Check if a node exists.
- pub fn contains(&self, id: &NodeId) -> bool {
- self.nodes.contains_key(id)
- }
-
- /// Get all descendant IDs of a node (not including the node itself).
- pub fn descendants(&self, id: &NodeId) -> Vec {
- let mut result = Vec::new();
- let mut queue: Vec = self
- .nodes
- .get(id)
- .map(|n| n.children.clone())
- .unwrap_or_default();
-
- while let Some(current) = queue.pop() {
- if let Some(node) = self.nodes.get(¤t) {
- queue.extend(node.children.clone());
- }
- result.push(current);
- }
- result
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::node::{NodeKind, NodeMetadata};
-
- fn root_meta() -> NodeMetadata {
- NodeMetadata::new(NodeKind::Root, "Dirigent")
- }
-
- fn connector_meta(label: &str) -> NodeMetadata {
- NodeMetadata::new(NodeKind::Connector, label).with_state(NodeState::Running)
- }
-
- fn make_tree() -> NodeTree {
- let root = NodeId::new("dirigent");
- let mut tree = NodeTree::new(root.clone(), root_meta());
-
- // Add category nodes
- tree.insert(
- root.child("connectors"),
- &root,
- NodeMetadata::new(NodeKind::Root, "Connectors"),
- None,
- )
- .unwrap();
- tree.insert(
- root.child("services"),
- &root,
- NodeMetadata::new(NodeKind::Root, "Services"),
- None,
- )
- .unwrap();
-
- // Add connectors
- let connectors = root.child("connectors");
- tree.insert(
- connectors.child("opencode-1"),
- &connectors,
- connector_meta("OpenCode #1"),
- None,
- )
- .unwrap();
- tree.insert(
- connectors.child("acp-claude"),
- &connectors,
- connector_meta("ACP Claude"),
- None,
- )
- .unwrap();
-
- // Add a process child under acp-claude
- let acp = connectors.child("acp-claude");
- tree.insert(
- acp.child("stdio-process"),
- &acp,
- NodeMetadata::new(NodeKind::Process, "stdio-transport")
- .with_state(NodeState::Running)
- .with_property("pid", serde_json::json!(42)),
- None,
- )
- .unwrap();
-
- tree
- }
-
- #[test]
- fn test_new_tree_has_root() {
- let root = NodeId::new("dirigent");
- let tree = NodeTree::new(root.clone(), root_meta());
-
- assert_eq!(tree.len(), 1);
- assert!(tree.get(&root).is_some());
- assert_eq!(tree.root_id(), &root);
- }
-
- #[test]
- fn test_insert_and_lookup() {
- let tree = make_tree();
-
- assert_eq!(tree.len(), 6); // root + connectors + services + opencode + acp + stdio
- assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude")));
- assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
- }
-
- #[test]
- fn test_insert_duplicate_fails() {
- let mut tree = make_tree();
- let result = tree.insert(
- NodeId::new("dirigent/connectors/opencode-1"),
- &NodeId::new("dirigent/connectors"),
- connector_meta("Duplicate"),
- None,
- );
- assert!(matches!(result, Err(InspectorError::NodeAlreadyExists(_))));
- }
-
- #[test]
- fn test_insert_missing_parent_fails() {
- let mut tree = make_tree();
- let result = tree.insert(
- NodeId::new("dirigent/nonexistent/child"),
- &NodeId::new("dirigent/nonexistent"),
- connector_meta("Orphan"),
- None,
- );
- assert!(matches!(result, Err(InspectorError::ParentNotFound(_))));
- }
-
- #[test]
- fn test_children() {
- let tree = make_tree();
- let connectors = NodeId::new("dirigent/connectors");
- let children = tree.children(&connectors);
-
- assert_eq!(children.len(), 2);
- let child_ids: Vec<&str> = children.iter().map(|c| c.id.as_str()).collect();
- assert!(child_ids.contains(&"dirigent/connectors/opencode-1"));
- assert!(child_ids.contains(&"dirigent/connectors/acp-claude"));
- }
-
- #[test]
- fn test_ancestors() {
- let tree = make_tree();
- let stdio = NodeId::new("dirigent/connectors/acp-claude/stdio-process");
- let ancestors = tree.ancestors(&stdio);
-
- assert_eq!(ancestors.len(), 3);
- assert_eq!(ancestors[0].id.as_str(), "dirigent/connectors/acp-claude");
- assert_eq!(ancestors[1].id.as_str(), "dirigent/connectors");
- assert_eq!(ancestors[2].id.as_str(), "dirigent");
- }
-
- #[test]
- fn test_remove_reparents_children() {
- let mut tree = make_tree();
-
- // Remove acp-claude; its child (stdio-process) should be reparented to "connectors"
- tree.remove(&NodeId::new("dirigent/connectors/acp-claude"))
- .unwrap();
-
- assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude")));
- assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
-
- // stdio-process should now be a child of connectors
- let connectors = tree.get(&NodeId::new("dirigent/connectors")).unwrap();
- assert!(connectors
- .children
- .contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
-
- // stdio-process parent should be connectors
- let stdio = tree
- .get(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
- .unwrap();
- assert_eq!(
- stdio.parent.as_ref().unwrap().as_str(),
- "dirigent/connectors"
- );
- }
-
- #[test]
- fn test_remove_subtree() {
- let mut tree = make_tree();
-
- tree.remove_subtree(&NodeId::new("dirigent/connectors/acp-claude"))
- .unwrap();
-
- assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude")));
- assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
- assert_eq!(tree.len(), 4); // root + connectors + services + opencode
-
- // connectors should only have opencode-1
- let connectors = tree.get(&NodeId::new("dirigent/connectors")).unwrap();
- assert_eq!(connectors.children.len(), 1);
- }
-
- #[test]
- fn test_cannot_remove_root() {
- let mut tree = make_tree();
- let result = tree.remove(&NodeId::new("dirigent"));
- assert!(matches!(result, Err(InspectorError::CannotRemoveRoot)));
-
- let result = tree.remove_subtree(&NodeId::new("dirigent"));
- assert!(matches!(result, Err(InspectorError::CannotRemoveRoot)));
- }
-
- #[test]
- fn test_update_state() {
- let mut tree = make_tree();
- let id = NodeId::new("dirigent/connectors/opencode-1");
-
- let old = tree
- .update_state(&id, NodeState::Error("timeout".into()))
- .unwrap();
- assert_eq!(old, NodeState::Running);
-
- let node = tree.get(&id).unwrap();
- assert_eq!(node.metadata.state, NodeState::Error("timeout".into()));
- }
-
- #[test]
- fn test_update_properties() {
- let mut tree = make_tree();
- let id = NodeId::new("dirigent/connectors/acp-claude/stdio-process");
-
- let mut props = HashMap::new();
- props.insert("cpu_percent".to_string(), serde_json::json!(45.2));
- props.insert("memory_mb".to_string(), serde_json::json!(128));
-
- let keys = tree.update_properties(&id, props).unwrap();
- assert_eq!(keys.len(), 2);
-
- let node = tree.get(&id).unwrap();
- assert_eq!(node.metadata.properties["pid"], serde_json::json!(42)); // original preserved
- assert_eq!(
- node.metadata.properties["cpu_percent"],
- serde_json::json!(45.2)
- );
- }
-
- #[test]
- fn test_update_properties_no_change() {
- let mut tree = make_tree();
- let id = NodeId::new("dirigent/connectors/acp-claude/stdio-process");
-
- // First update: new key, should be reported as changed
- let mut props = HashMap::new();
- props.insert("cpu_percent".to_string(), serde_json::json!(45.2));
- let keys = tree.update_properties(&id, props).unwrap();
- assert_eq!(keys.len(), 1);
-
- // Second update: same value, should NOT be reported as changed
- let mut props = HashMap::new();
- props.insert("cpu_percent".to_string(), serde_json::json!(45.2));
- let keys = tree.update_properties(&id, props).unwrap();
- assert_eq!(keys.len(), 0, "Same value should not be reported as changed");
-
- // Third update: different value, should be reported
- let mut props = HashMap::new();
- props.insert("cpu_percent".to_string(), serde_json::json!(50.0));
- let keys = tree.update_properties(&id, props).unwrap();
- assert_eq!(keys.len(), 1);
- }
-
- #[test]
- fn test_descendants() {
- let tree = make_tree();
- let root = NodeId::new("dirigent");
- let descendants = tree.descendants(&root);
-
- assert_eq!(descendants.len(), 5); // all except root itself
- }
-
- #[test]
- fn test_is_empty() {
- let root = NodeId::new("dirigent");
- let tree = NodeTree::new(root, root_meta());
- assert!(tree.is_empty());
-
- let tree = make_tree();
- assert!(!tree.is_empty());
- }
-}
diff --git a/crates/dirigent_inspector/tests/integration.rs b/crates/dirigent_inspector/tests/integration.rs
deleted file mode 100644
index cf40c09..0000000
--- a/crates/dirigent_inspector/tests/integration.rs
+++ /dev/null
@@ -1,355 +0,0 @@
-use dirigent_inspector::*;
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::Duration;
-
-/// Full integration test simulating a Dirigent-like setup:
-/// - Root node "dirigent"
-/// - Connector nodes under "dirigent/connectors"
-/// - Process node under a connector
-/// - Service nodes under "dirigent/services"
-/// - System node under "dirigent/system"
-/// - Bidirectional channel communication
-/// - Snapshot capture
-#[tokio::test]
-async fn test_full_inspector_tree() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
- assert_eq!(root.as_str(), "dirigent");
-
- // Subscribe to events
- let mut event_rx = registry.subscribe();
-
- // -- Build the tree structure --
-
- // Category: connectors
- let connectors_handle = registry
- .register(
- NodeId::new("dirigent/connectors"),
- &root,
- NodeMetadata::new(NodeKind::Custom("category".into()), "Connectors")
- .with_state(NodeState::Running),
- None,
- )
- .await
- .unwrap();
-
- // Category: services
- let mut services_handle = registry
- .register(
- NodeId::new("dirigent/services"),
- &root,
- NodeMetadata::new(NodeKind::Custom("category".into()), "Services")
- .with_state(NodeState::Running),
- None,
- )
- .await
- .unwrap();
-
- // Category: system
- let mut system_handle = registry
- .register(
- NodeId::new("dirigent/system"),
- &root,
- NodeMetadata::new(NodeKind::Custom("category".into()), "System")
- .with_state(NodeState::Running),
- None,
- )
- .await
- .unwrap();
-
- // Connector: ACP Claude
- let acp_handle = connectors_handle
- .register_child(
- NodeId::new("dirigent/connectors/acp-claude"),
- NodeMetadata::new(NodeKind::Connector, "ACP Claude")
- .with_state(NodeState::Running)
- .with_property("transport", serde_json::json!("stdio")),
- None,
- )
- .await
- .unwrap();
-
- // Process: stdio transport child
- let current_pid = std::process::id();
- let proc_handle = acp_handle
- .register_child(
- NodeId::new("dirigent/connectors/acp-claude/stdio-process"),
- NodeMetadata::new(NodeKind::Process, "stdio-transport")
- .with_state(NodeState::Running)
- .with_property("pid", serde_json::json!(current_pid)),
- None,
- )
- .await
- .unwrap();
-
- // Connector: OpenCode
- let _opencode_handle = connectors_handle
- .register_child(
- NodeId::new("dirigent/connectors/opencode-1"),
- NodeMetadata::new(NodeKind::Connector, "OpenCode #1")
- .with_state(NodeState::Running)
- .with_property("base_url", serde_json::json!("http://localhost:12225")),
- None,
- )
- .await
- .unwrap();
-
- // Service: Archivist
- let _archivist_handle = services_handle
- .register_child(
- NodeId::new("dirigent/services/archivist"),
- NodeMetadata::new(NodeKind::Service, "Archivist EventHandler")
- .with_state(NodeState::Idle),
- None,
- )
- .await
- .unwrap();
-
- // System: Host
- let _host_handle = system_handle
- .register_child(
- NodeId::new("dirigent/system/host"),
- NodeMetadata::new(NodeKind::System, "Host Machine").with_state(NodeState::Running),
- None,
- )
- .await
- .unwrap();
-
- // -- Verify tree structure --
- // dirigent, connectors, services, system, acp-claude, stdio-process, opencode-1, archivist, host = 9
- assert_eq!(registry.node_count().await, 9);
-
- // Check children
- let root_children = registry.get_children(&root).await;
- assert_eq!(root_children.len(), 3); // connectors, services, system
-
- let connector_children = registry
- .get_children(&NodeId::new("dirigent/connectors"))
- .await;
- assert_eq!(connector_children.len(), 2); // acp-claude, opencode-1
-
- let acp_children = registry
- .get_children(&NodeId::new("dirigent/connectors/acp-claude"))
- .await;
- assert_eq!(acp_children.len(), 1); // stdio-process
-
- // -- State transitions --
- proc_handle
- .set_state(NodeState::Busy("processing message".into()))
- .await
- .unwrap();
-
- let proc_meta = registry
- .get_node(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
- .await
- .unwrap();
- assert_eq!(
- proc_meta.state,
- NodeState::Busy("processing message".into())
- );
-
- // -- Property updates --
- let mut props = HashMap::new();
- props.insert("cpu_percent".to_string(), serde_json::json!(23.5));
- props.insert("memory_mb".to_string(), serde_json::json!(256));
- proc_handle.set_properties(props).await.unwrap();
-
- let proc_meta = registry
- .get_node(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
- .await
- .unwrap();
- assert_eq!(proc_meta.properties["cpu_percent"], serde_json::json!(23.5));
- assert_eq!(proc_meta.properties["pid"], serde_json::json!(current_pid)); // original preserved
-
- // -- Snapshot --
- let snapshot = registry.snapshot().await;
- assert_eq!(snapshot.node_count(), 9);
-
- // Verify snapshot structure
- let snap_root = snapshot.root().unwrap();
- assert_eq!(snap_root.id.as_str(), "dirigent");
- assert_eq!(snap_root.children.len(), 3);
-
- let snap_proc = snapshot
- .find(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
- .unwrap();
- assert_eq!(
- snap_proc.parent,
- Some(NodeId::new("dirigent/connectors/acp-claude"))
- );
- assert_eq!(
- snap_proc.metadata.state,
- NodeState::Busy("processing message".into())
- );
-
- // Snapshot serialization roundtrip
- let json = serde_json::to_string(&snapshot).unwrap();
- let deserialized: TreeSnapshot = serde_json::from_str(&json).unwrap();
- assert_eq!(deserialized.node_count(), 9);
-
- // -- Events --
- // Drain all events that were emitted during setup
- let mut event_count = 0;
- while let Ok(event) = event_rx.try_recv() {
- event_count += 1;
- // Just verify they're valid events
- match event {
- InspectorEvent::NodeRegistered { .. }
- | InspectorEvent::StateChanged { .. }
- | InspectorEvent::PropertiesUpdated { .. }
- | InspectorEvent::NodeRemoved { .. } => {}
- }
- }
- assert!(event_count > 0, "Should have received events");
-
- // -- Cleanup: detach category handles so they survive --
- services_handle.detach();
- system_handle.detach();
-}
-
-/// Test process monitor with the current process.
-#[tokio::test]
-async fn test_process_monitor_integration() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- let current_pid = std::process::id();
- let node_id = NodeId::new("dirigent/test-process");
-
- let mut handle = registry
- .register(
- node_id.clone(),
- &root,
- NodeMetadata::new(NodeKind::Process, "Test Process").with_state(NodeState::Running),
- None,
- )
- .await
- .unwrap();
- handle.detach();
-
- // Create monitor and track current process
- let mut monitor = ProcessMonitor::new();
- monitor.track(current_pid, node_id.clone());
-
- // Start polling
- let task = monitor.start_polling(Arc::clone(®istry), Duration::from_millis(100));
-
- // Wait for data to be populated
- tokio::time::sleep(Duration::from_millis(350)).await;
-
- let meta = registry.get_node(&node_id).await.unwrap();
- assert!(
- meta.properties.contains_key("pid"),
- "Should have PID property"
- );
- assert!(
- meta.properties.contains_key("memory_bytes"),
- "Should have memory property"
- );
- assert_eq!(meta.properties["pid"], serde_json::json!(current_pid));
-
- task.abort();
-}
-
-/// Test bidirectional channel communication with a simulated node loop.
-#[tokio::test]
-async fn test_channel_integration() {
- let (sender, mut receiver) = inspector_channel(10);
-
- // Simulate a node loop that handles commands
- let node_loop = tokio::spawn(async move {
- let mut handled = 0;
- while let Some((cmd, resp_tx)) = receiver.recv().await {
- let response = match cmd.kind {
- CommandKind::Introspect => CommandResponse::ok(
- &cmd.id,
- serde_json::json!({
- "queue_depth": receiver.pending_count(),
- "sessions_active": 3
- }),
- ),
- CommandKind::Execute(ref name) if name == "restart" => {
- CommandResponse::ok(&cmd.id, serde_json::json!("restarting..."))
- }
- _ => CommandResponse::err(&cmd.id, "unknown command"),
- };
- let _ = resp_tx.send(response);
- handled += 1;
- if handled >= 2 {
- break;
- }
- }
- });
-
- // Send introspect command
- let resp = sender
- .send(NodeCommand {
- id: "cmd-1".to_string(),
- kind: CommandKind::Introspect,
- payload: serde_json::Value::Null,
- })
- .await
- .unwrap();
- assert!(resp.success);
- assert_eq!(resp.data["sessions_active"], 3);
-
- // Send execute command
- let resp = sender
- .send(NodeCommand {
- id: "cmd-2".to_string(),
- kind: CommandKind::Execute("restart".to_string()),
- payload: serde_json::Value::Null,
- })
- .await
- .unwrap();
- assert!(resp.success);
-
- node_loop.await.unwrap();
-}
-
-/// Test that dropping a handle auto-deregisters, and subtree removal works.
-#[tokio::test]
-async fn test_lifecycle_management() {
- let registry = Arc::new(InspectorRegistry::new());
- let root = registry.root_id().await;
-
- // Build a subtree
- let parent = registry
- .register(
- NodeId::new("dirigent/parent"),
- &root,
- NodeMetadata::new(NodeKind::Connector, "Parent"),
- None,
- )
- .await
- .unwrap();
-
- let _child1 = parent
- .register_child(
- NodeId::new("dirigent/parent/child1"),
- NodeMetadata::new(NodeKind::Process, "Child 1"),
- None,
- )
- .await
- .unwrap();
-
- let _child2 = parent
- .register_child(
- NodeId::new("dirigent/parent/child2"),
- NodeMetadata::new(NodeKind::AsyncTask, "Child 2"),
- None,
- )
- .await
- .unwrap();
-
- assert_eq!(registry.node_count().await, 4); // root + parent + 2 children
-
- // Remove entire subtree
- registry
- .deregister_subtree(&NodeId::new("dirigent/parent"))
- .await
- .unwrap();
-
- assert_eq!(registry.node_count().await, 1); // only root remains
-}
diff --git a/crates/dirigent_process/Cargo.toml b/crates/dirigent_process/Cargo.toml
deleted file mode 100644
index cfa4a31..0000000
--- a/crates/dirigent_process/Cargo.toml
+++ /dev/null
@@ -1,32 +0,0 @@
-[package]
-name = "dirigent_process"
-version = "0.1.0"
-edition = "2021"
-description = "Cross-platform process lifecycle management for Dirigent"
-
-[lib]
-path = "src/lib.rs"
-
-[features]
-default = []
-tokio = ["dep:tokio"]
-
-[dependencies]
-tracing = "0.1"
-tokio = { version = "1", features = ["process", "time"], optional = true }
-
-[target.'cfg(windows)'.dependencies]
-windows-sys = { version = "0.59", features = [
- "Win32_System_JobObjects",
- "Win32_System_Threading",
- "Win32_Foundation",
- "Win32_System_Console",
- "Win32_Security",
-] }
-
-[target.'cfg(unix)'.dependencies]
-nix = { version = "0.29", features = ["signal", "process"] }
-libc = "0.2"
-
-[dev-dependencies]
-tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/dirigent_process/src/lib.rs b/crates/dirigent_process/src/lib.rs
deleted file mode 100644
index 66a9f28..0000000
--- a/crates/dirigent_process/src/lib.rs
+++ /dev/null
@@ -1,30 +0,0 @@
-pub mod traits;
-mod shutdown;
-
-#[cfg(windows)]
-mod windows;
-#[cfg(unix)]
-mod unix;
-#[cfg(target_os = "linux")]
-mod linux;
-
-pub use traits::{ProcessGroupManager, ProcessLifecycle};
-pub use shutdown::graceful_shutdown_sync;
-#[cfg(feature = "tokio")]
-pub use shutdown::graceful_shutdown_async;
-
-use std::sync::Arc;
-
-/// Create the platform-appropriate ProcessGroupManager.
-///
-/// Call `init()` on the returned manager before use.
-pub fn create_manager() -> Arc {
- #[cfg(windows)]
- { Arc::new(windows::WindowsProcessGroupManager::new()) }
-
- #[cfg(target_os = "linux")]
- { Arc::new(linux::LinuxProcessGroupManager::new()) }
-
- #[cfg(all(unix, not(target_os = "linux")))]
- { Arc::new(unix::UnixProcessGroupManager::new()) }
-}
diff --git a/crates/dirigent_process/src/linux.rs b/crates/dirigent_process/src/linux.rs
deleted file mode 100644
index 50fa07e..0000000
--- a/crates/dirigent_process/src/linux.rs
+++ /dev/null
@@ -1,91 +0,0 @@
-#![cfg(target_os = "linux")]
-
-use crate::traits::{ProcessGroupManager, ProcessLifecycle};
-use nix::sys::signal::{killpg, Signal};
-use nix::unistd::Pid;
-use std::io;
-use std::os::unix::process::CommandExt;
-use tracing::{debug, info, warn};
-
-/// Linux process group manager with kernel-level orphan prevention.
-///
-/// Uses `PR_SET_CHILD_SUBREAPER` so orphaned grandchildren are reparented
-/// to this process, and `PR_SET_PDEATHSIG` so children auto-die when
-/// the parent crashes.
-pub struct LinuxProcessGroupManager;
-
-impl LinuxProcessGroupManager {
- pub fn new() -> Self { Self }
-}
-
-impl ProcessGroupManager for LinuxProcessGroupManager {
- fn init(&self) -> Result<(), io::Error> {
- unsafe {
- if libc::prctl(libc::PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0) != 0 {
- let err = io::Error::last_os_error();
- warn!(error = %err, "Failed to set PR_SET_CHILD_SUBREAPER");
- return Err(err);
- }
- }
- info!("Linux process group manager initialized (child subreaper enabled)");
- Ok(())
- }
-
- fn create_lifecycle(&self) -> Box {
- Box::new(LinuxProcessLifecycle)
- }
-}
-
-pub struct LinuxProcessLifecycle;
-
-impl ProcessLifecycle for LinuxProcessLifecycle {
- fn configure_command(&self, cmd: &mut std::process::Command) {
- unsafe {
- cmd.pre_exec(|| {
- if libc::setpgid(0, 0) != 0 {
- return Err(io::Error::last_os_error());
- }
- if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL, 0, 0, 0) != 0 {
- return Err(io::Error::last_os_error());
- }
- Ok(())
- });
- }
- }
-
- #[cfg(feature = "tokio")]
- fn configure_async_command(&self, cmd: &mut tokio::process::Command) {
- unsafe {
- cmd.pre_exec(|| {
- if libc::setpgid(0, 0) != 0 {
- return Err(io::Error::last_os_error());
- }
- if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL, 0, 0, 0) != 0 {
- return Err(io::Error::last_os_error());
- }
- Ok(())
- });
- }
- }
-
- fn register_child(&self, pid: u32) -> Result<(), io::Error> {
- debug!(pid, pgid = pid, "Linux child registered (process group + PR_SET_PDEATHSIG)");
- Ok(())
- }
-
- fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> {
- let pgid = Pid::from_raw(pid as i32);
- killpg(pgid, Signal::SIGTERM)
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
- debug!(pid, "Sent SIGTERM to process group");
- Ok(())
- }
-
- fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> {
- let pgid = Pid::from_raw(pid as i32);
- killpg(pgid, Signal::SIGKILL)
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
- debug!(pid, "Sent SIGKILL to process group");
- Ok(())
- }
-}
diff --git a/crates/dirigent_process/src/shutdown.rs b/crates/dirigent_process/src/shutdown.rs
deleted file mode 100644
index dba4483..0000000
--- a/crates/dirigent_process/src/shutdown.rs
+++ /dev/null
@@ -1,66 +0,0 @@
-use crate::traits::ProcessLifecycle;
-use std::time::Duration;
-
-/// Graceful shutdown: send signal → wait → force kill (sync, blocking).
-///
-/// Returns `true` if the process exited within the timeout, `false` if force-killed.
-pub fn graceful_shutdown_sync(
- lifecycle: &dyn ProcessLifecycle,
- child: &mut std::process::Child,
- timeout: Duration,
-) -> bool {
- let pid = child.id();
- if pid == 0 {
- return true;
- }
-
- if lifecycle.send_shutdown_signal(pid).is_err() {
- return true;
- }
-
- let start = std::time::Instant::now();
- let poll_interval = Duration::from_millis(50);
-
- while start.elapsed() < timeout {
- match child.try_wait() {
- Ok(Some(_)) => return true,
- Ok(None) => std::thread::sleep(poll_interval),
- Err(_) => return true,
- }
- }
-
- tracing::debug!(pid, "Graceful shutdown timed out, force killing");
- let _ = lifecycle.send_kill_signal(pid);
- let _ = child.wait();
- false
-}
-
-/// Graceful shutdown: send signal → wait → force kill (async, non-blocking).
-///
-/// Returns `true` if the process exited within the timeout, `false` if force-killed.
-#[cfg(feature = "tokio")]
-pub async fn graceful_shutdown_async(
- lifecycle: &dyn ProcessLifecycle,
- child: &mut tokio::process::Child,
- timeout: Duration,
-) -> bool {
- let pid = match child.id() {
- Some(0) | None => return true,
- Some(pid) => pid,
- };
-
- if lifecycle.send_shutdown_signal(pid).is_err() {
- return true;
- }
-
- match tokio::time::timeout(timeout, child.wait()).await {
- Ok(Ok(_)) => true,
- Ok(Err(_)) => true,
- Err(_) => {
- tracing::debug!(pid, "Graceful shutdown timed out, force killing");
- let _ = lifecycle.send_kill_signal(pid);
- let _ = child.wait().await;
- false
- }
- }
-}
diff --git a/crates/dirigent_process/src/traits.rs b/crates/dirigent_process/src/traits.rs
deleted file mode 100644
index b81c865..0000000
--- a/crates/dirigent_process/src/traits.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-use std::io;
-
-/// Global process group manager — one per application lifetime.
-///
-/// On Windows, owns a Job Object with KILL_ON_JOB_CLOSE.
-/// On Linux, configures the process as a child subreaper.
-/// On macOS, no-op (process groups handle cleanup).
-pub trait ProcessGroupManager: Send + Sync {
- /// Initialize platform-specific parent process configuration.
- fn init(&self) -> Result<(), io::Error>;
-
- /// Create a lifecycle handle for managing a child process.
- fn create_lifecycle(&self) -> Box;
-}
-
-/// Per-child process lifecycle manager.
-///
-/// All methods are synchronous — OS signal/handle calls are instant.
-/// For timeout-based shutdown, use the free functions in the `shutdown` module.
-pub trait ProcessLifecycle: Send + Sync {
- /// Configure a std::process::Command before spawning.
- /// Sets platform-specific flags (process group, creation flags, pre_exec hooks).
- fn configure_command(&self, cmd: &mut std::process::Command);
-
- /// Configure a tokio::process::Command before spawning.
- #[cfg(feature = "tokio")]
- fn configure_async_command(&self, cmd: &mut tokio::process::Command);
-
- /// Register a spawned child with the lifecycle manager.
- /// Must be called immediately after spawn with the child's PID.
- fn register_child(&self, pid: u32) -> Result<(), io::Error>;
-
- /// Send a graceful shutdown signal to the process (and its tree).
- /// Windows: CTRL_BREAK_EVENT. Unix: SIGTERM to process group.
- fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error>;
-
- /// Forcefully kill the process (and its tree).
- /// Windows: TerminateProcess. Unix: SIGKILL to process group.
- fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error>;
-}
diff --git a/crates/dirigent_process/src/unix.rs b/crates/dirigent_process/src/unix.rs
deleted file mode 100644
index a63b470..0000000
--- a/crates/dirigent_process/src/unix.rs
+++ /dev/null
@@ -1,78 +0,0 @@
-#![cfg(unix)]
-
-use crate::traits::{ProcessGroupManager, ProcessLifecycle};
-use nix::sys::signal::{killpg, Signal};
-use nix::unistd::Pid;
-use std::io;
-use std::os::unix::process::CommandExt;
-use tracing::{debug, info};
-
-/// macOS / generic Unix process group manager.
-///
-/// Uses process groups for tree management. No kernel-level orphan
-/// prevention (macOS lacks `PR_SET_PDEATHSIG`). Relies on launchd
-/// supervision for crash recovery.
-pub struct UnixProcessGroupManager;
-
-impl UnixProcessGroupManager {
- pub fn new() -> Self { Self }
-}
-
-impl ProcessGroupManager for UnixProcessGroupManager {
- fn init(&self) -> Result<(), io::Error> {
- info!("Unix process group manager initialized");
- Ok(())
- }
-
- fn create_lifecycle(&self) -> Box {
- Box::new(UnixProcessLifecycle)
- }
-}
-
-pub struct UnixProcessLifecycle;
-
-impl ProcessLifecycle for UnixProcessLifecycle {
- fn configure_command(&self, cmd: &mut std::process::Command) {
- unsafe {
- cmd.pre_exec(|| {
- if libc::setpgid(0, 0) != 0 {
- return Err(io::Error::last_os_error());
- }
- Ok(())
- });
- }
- }
-
- #[cfg(feature = "tokio")]
- fn configure_async_command(&self, cmd: &mut tokio::process::Command) {
- unsafe {
- cmd.pre_exec(|| {
- if libc::setpgid(0, 0) != 0 {
- return Err(io::Error::last_os_error());
- }
- Ok(())
- });
- }
- }
-
- fn register_child(&self, pid: u32) -> Result<(), io::Error> {
- debug!(pid, pgid = pid, "Child registered in its own process group");
- Ok(())
- }
-
- fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> {
- let pgid = Pid::from_raw(pid as i32);
- killpg(pgid, Signal::SIGTERM)
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
- debug!(pid, "Sent SIGTERM to process group");
- Ok(())
- }
-
- fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> {
- let pgid = Pid::from_raw(pid as i32);
- killpg(pgid, Signal::SIGKILL)
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
- debug!(pid, "Sent SIGKILL to process group");
- Ok(())
- }
-}
diff --git a/crates/dirigent_process/src/windows.rs b/crates/dirigent_process/src/windows.rs
deleted file mode 100644
index 90e1dad..0000000
--- a/crates/dirigent_process/src/windows.rs
+++ /dev/null
@@ -1,199 +0,0 @@
-#![cfg(windows)]
-
-use crate::traits::{ProcessGroupManager, ProcessLifecycle};
-use std::io;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::Mutex;
-use tracing::{debug, info, warn};
-use windows_sys::Win32::Foundation::{CloseHandle, FALSE, HANDLE};
-use windows_sys::Win32::System::JobObjects::{
- AssignProcessToJobObject, CreateJobObjectW, JobObjectExtendedLimitInformation,
- SetInformationJobObject, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
-};
-use windows_sys::Win32::System::Threading::{
- OpenProcess, TerminateProcess, PROCESS_ALL_ACCESS,
-};
-use windows_sys::Win32::System::Console::{
- GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT,
-};
-
-/// Windows process group manager using Job Objects.
-///
-/// Creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` — when
-/// this manager is dropped (or the process crashes), the OS automatically
-/// kills all assigned child processes including grandchildren.
-pub struct WindowsProcessGroupManager {
- /// Wrapped in a Mutex so we can mutate through a shared reference after init.
- job_handle: Mutex,
- initialized: AtomicBool,
-}
-
-// Safety: HANDLE (*mut c_void) is not Send/Sync by default, but we only
-// mutate it during init() (guarded by AtomicBool + Mutex) and read it
-// (via copy) in create_lifecycle() and drop. No concurrent mutation occurs.
-unsafe impl Send for WindowsProcessGroupManager {}
-unsafe impl Sync for WindowsProcessGroupManager {}
-
-impl WindowsProcessGroupManager {
- pub fn new() -> Self {
- Self {
- job_handle: Mutex::new(std::ptr::null_mut()),
- initialized: AtomicBool::new(false),
- }
- }
-
- fn handle(&self) -> HANDLE {
- *self.job_handle.lock().unwrap()
- }
-}
-
-impl Default for WindowsProcessGroupManager {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl ProcessGroupManager for WindowsProcessGroupManager {
- fn init(&self) -> Result<(), io::Error> {
- if self.initialized.swap(true, Ordering::SeqCst) {
- return Ok(());
- }
-
- unsafe {
- let handle = CreateJobObjectW(std::ptr::null(), std::ptr::null());
- if handle.is_null() {
- self.initialized.store(false, Ordering::SeqCst);
- return Err(io::Error::last_os_error());
- }
-
- let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed();
- info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
-
- let result = SetInformationJobObject(
- handle,
- JobObjectExtendedLimitInformation,
- &info as *const _ as *const _,
- std::mem::size_of::() as u32,
- );
-
- if result == FALSE {
- CloseHandle(handle);
- self.initialized.store(false, Ordering::SeqCst);
- return Err(io::Error::last_os_error());
- }
-
- *self.job_handle.lock().unwrap() = handle;
-
- info!("Windows Job Object created with KILL_ON_JOB_CLOSE");
- Ok(())
- }
- }
-
- fn create_lifecycle(&self) -> Box {
- Box::new(WindowsProcessLifecycle {
- job_handle: self.handle(),
- })
- }
-}
-
-impl Drop for WindowsProcessGroupManager {
- fn drop(&mut self) {
- let handle = self.handle();
- if !handle.is_null() {
- unsafe { CloseHandle(handle); }
- debug!("Windows Job Object closed");
- }
- }
-}
-
-/// Per-child lifecycle manager for Windows.
-///
-/// Assigns children to the parent's Job Object and uses
-/// `CTRL_BREAK_EVENT` / `TerminateProcess` for shutdown.
-pub struct WindowsProcessLifecycle {
- job_handle: HANDLE,
-}
-
-// Safety: same reasoning as WindowsProcessGroupManager — HANDLE is used
-// read-only after construction (only passed to OS APIs).
-unsafe impl Send for WindowsProcessLifecycle {}
-unsafe impl Sync for WindowsProcessLifecycle {}
-
-impl ProcessLifecycle for WindowsProcessLifecycle {
- fn configure_command(&self, cmd: &mut std::process::Command) {
- use std::os::windows::process::CommandExt;
- const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
- const CREATE_NO_WINDOW: u32 = 0x0800_0000;
- cmd.creation_flags(CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW);
- }
-
- #[cfg(feature = "tokio")]
- fn configure_async_command(&self, cmd: &mut tokio::process::Command) {
- use std::os::windows::process::CommandExt;
- const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
- const CREATE_NO_WINDOW: u32 = 0x0800_0000;
- cmd.creation_flags(CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW);
- }
-
- fn register_child(&self, pid: u32) -> Result<(), io::Error> {
- if self.job_handle.is_null() {
- warn!(pid, "Job Object not initialized, skipping child registration");
- return Ok(());
- }
-
- unsafe {
- let process_handle = OpenProcess(PROCESS_ALL_ACCESS, FALSE, pid);
- if process_handle.is_null() {
- return Err(io::Error::last_os_error());
- }
-
- let result = AssignProcessToJobObject(self.job_handle, process_handle);
- CloseHandle(process_handle);
-
- if result == FALSE {
- let err = io::Error::last_os_error();
- warn!(pid, error = %err, "Failed to assign process to Job Object (may already be in a job)");
- return Err(err);
- }
-
- debug!(pid, "Process assigned to Job Object");
- Ok(())
- }
- }
-
- fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> {
- unsafe {
- if GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid) == FALSE {
- return Err(io::Error::last_os_error());
- }
- }
- debug!(pid, "Sent CTRL_BREAK_EVENT");
- Ok(())
- }
-
- fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> {
- unsafe {
- let handle = OpenProcess(PROCESS_ALL_ACCESS, FALSE, pid);
- if handle.is_null() {
- let err = io::Error::last_os_error();
- // ERROR_INVALID_PARAMETER (87) means process already exited
- if err.raw_os_error() == Some(87) {
- return Ok(());
- }
- return Err(err);
- }
- let result = TerminateProcess(handle, 1);
- CloseHandle(handle);
- if result == FALSE {
- let err = io::Error::last_os_error();
- // ERROR_ACCESS_DENIED (5) — process may have already exited
- if err.raw_os_error() == Some(5) {
- return Ok(());
- }
- return Err(err);
- }
- }
- debug!(pid, "Sent TerminateProcess");
- Ok(())
- }
-}
diff --git a/crates/dirigent_process/tests/lifecycle.rs b/crates/dirigent_process/tests/lifecycle.rs
deleted file mode 100644
index 39a5264..0000000
--- a/crates/dirigent_process/tests/lifecycle.rs
+++ /dev/null
@@ -1,149 +0,0 @@
-use dirigent_process::{create_manager, graceful_shutdown_sync};
-use std::process::Command;
-use std::time::Duration;
-
-/// Build a long-running command that does not require a TTY on any platform.
-///
-/// On Windows, `timeout /t N /nobreak` fails when stdin is a pipe (no console),
-/// so we use `ping -n N 127.0.0.1` which sleeps approximately N-1 seconds with
-/// no TTY requirement.
-///
-/// On Unix, `sleep N` is the idiomatic choice.
-#[cfg(windows)]
-fn long_sleep_cmd(seconds: u32) -> Command {
- let mut cmd = Command::new("ping");
- cmd.args(["-n", &seconds.to_string(), "127.0.0.1"]);
- cmd
-}
-
-#[cfg(unix)]
-fn long_sleep_cmd(seconds: u32) -> Command {
- let mut cmd = Command::new("sleep");
- cmd.arg(seconds.to_string());
- cmd
-}
-
-#[cfg(all(windows, feature = "tokio"))]
-fn long_sleep_async_cmd(seconds: u32) -> tokio::process::Command {
- let mut cmd = tokio::process::Command::new("ping");
- cmd.args(["-n", &seconds.to_string(), "127.0.0.1"]);
- cmd
-}
-
-#[cfg(all(unix, feature = "tokio"))]
-fn long_sleep_async_cmd(seconds: u32) -> tokio::process::Command {
- let mut cmd = tokio::process::Command::new("sleep");
- cmd.arg(seconds.to_string());
- cmd
-}
-
-#[test]
-fn test_manager_init() {
- let mgr = create_manager();
- mgr.init().expect("init should succeed");
- // Double init should also succeed (idempotent)
- mgr.init().expect("double init should succeed");
-}
-
-#[test]
-fn test_create_lifecycle() {
- let mgr = create_manager();
- mgr.init().expect("init failed");
- let _lifecycle = mgr.create_lifecycle();
-}
-
-#[test]
-fn test_configure_and_spawn() {
- let mgr = create_manager();
- mgr.init().expect("init failed");
- let lifecycle = mgr.create_lifecycle();
-
- let mut cmd = long_sleep_cmd(30);
- lifecycle.configure_command(&mut cmd);
-
- let mut child = cmd.spawn().expect("spawn failed");
- let pid = child.id();
- assert!(pid > 0);
-
- // Register should succeed
- lifecycle.register_child(pid).expect("register failed");
-
- // Process should still be running
- assert!(child.try_wait().expect("try_wait failed").is_none());
-
- // Clean up
- let _ = child.kill();
- let _ = child.wait();
-}
-
-#[test]
-fn test_graceful_shutdown_sync() {
- let mgr = create_manager();
- mgr.init().expect("init failed");
- let lifecycle = mgr.create_lifecycle();
-
- let mut cmd = long_sleep_cmd(60);
- lifecycle.configure_command(&mut cmd);
- let mut child = cmd.spawn().expect("spawn failed");
- let pid = child.id();
- lifecycle.register_child(pid).expect("register failed");
-
- // Graceful shutdown with 3s timeout — process won't exit voluntarily,
- // so it should be force-killed after timeout
- let exited_gracefully = graceful_shutdown_sync(
- lifecycle.as_ref(),
- &mut child,
- Duration::from_secs(3),
- );
-
- // Process should be dead now
- assert!(child.try_wait().expect("try_wait failed").is_some());
- // It was force-killed (ping/sleep don't handle SIGTERM/CTRL_BREAK)
- assert!(!exited_gracefully);
-}
-
-#[test]
-fn test_send_kill_signal() {
- let mgr = create_manager();
- mgr.init().expect("init failed");
- let lifecycle = mgr.create_lifecycle();
-
- let mut cmd = long_sleep_cmd(60);
- lifecycle.configure_command(&mut cmd);
- let mut child = cmd.spawn().expect("spawn failed");
- let pid = child.id();
- lifecycle.register_child(pid).expect("register failed");
-
- // Direct kill signal
- lifecycle.send_kill_signal(pid).expect("kill failed");
-
- // Wait for process to die
- let status = child.wait().expect("wait failed");
- assert!(!status.success());
-}
-
-#[cfg(feature = "tokio")]
-#[tokio::test]
-async fn test_async_graceful_shutdown() {
- use dirigent_process::graceful_shutdown_async;
-
- let mgr = create_manager();
- mgr.init().expect("init failed");
- let lifecycle = mgr.create_lifecycle();
-
- let mut cmd = long_sleep_async_cmd(60);
- lifecycle.configure_async_command(&mut cmd);
- let mut child = cmd.spawn().expect("spawn failed");
- let pid = child.id().expect("no pid");
- lifecycle.register_child(pid).expect("register failed");
-
- let exited_gracefully = graceful_shutdown_async(
- lifecycle.as_ref(),
- &mut child,
- Duration::from_secs(3),
- )
- .await;
-
- assert!(child.try_wait().expect("try_wait failed").is_some());
- assert!(!exited_gracefully);
-}