🥇 export from upstream (a1fa8e3a)

This commit is contained in:
2026-05-09 21:59:28 +02:00
parent bf5a79d931
commit 5829546671
33 changed files with 323 additions and 3719 deletions
-4
View File
@@ -31,8 +31,6 @@ dirigent_acp_api = { path = "../dirigent_acp_api", optional = true }
# Workspace dependencies
dirigent_config = { path = "../dirigent_config", optional = true }
dirigent_auth = { path = "../dirigent_auth" }
dirigent_process = { path = "../dirigent_process", features = ["tokio"], optional = true }
dirigent_inspector = { path = "../dirigent_inspector", optional = true }
dirigent_protocol = { path = "../dirigent_protocol", features = ["adapters"], optional = true }
dirigent_tools = { path = "../dirigent_tools", optional = true }
# SSE client for ACP transport
@@ -84,7 +82,6 @@ server = [
"dep:blake3",
"dep:dirigent_acp_api",
"dep:dirigent_config",
"dep:dirigent_inspector",
"dep:dirigent_protocol",
"dep:dirigent_tools",
"dep:eventsource-client",
@@ -99,5 +96,4 @@ server = [
"dep:tower-http",
"dep:tracing",
"dep:tracing-subscriber",
"dep:dirigent_process",
]
@@ -83,29 +83,29 @@ macro_rules! debug_log {
/// Called after `upsert_session` and on session metadata changes.
#[cfg(feature = "server")]
async fn inspector_upsert_session(
inspector: &Arc<dirigent_inspector::InspectorRegistry>,
inspector: &Arc<dyn crate::traits::ConnectorInspector>,
connector_id: &str,
session: &SessionInfo,
) {
let sess_node_id = dirigent_inspector::NodeId::new(format!(
let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session.id
));
let parent_id =
dirigent_inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
dirigent_protocol::inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
let node_state = match session.status {
SessionStatus::Active => dirigent_inspector::NodeState::Running,
SessionStatus::Processing => dirigent_inspector::NodeState::Busy("Generating".to_string()),
SessionStatus::Idle => dirigent_inspector::NodeState::Idle,
SessionStatus::Ended => dirigent_inspector::NodeState::Stopped,
SessionStatus::Active => dirigent_protocol::inspector::NodeState::Running,
SessionStatus::Processing => dirigent_protocol::inspector::NodeState::Busy("Generating".to_string()),
SessionStatus::Idle => dirigent_protocol::inspector::NodeState::Idle,
SessionStatus::Ended => dirigent_protocol::inspector::NodeState::Stopped,
};
let label = session.title.as_deref().unwrap_or(&session.id);
// Try to register; if already exists, update instead
let meta =
dirigent_inspector::NodeMetadata::new(dirigent_inspector::NodeKind::AsyncTask, label)
dirigent_protocol::inspector::NodeMetadata::new(dirigent_protocol::inspector::NodeKind::AsyncTask, label)
.with_state(node_state.clone())
.with_property("session_id", serde_json::json!(&session.id))
.with_property("status", serde_json::json!(format!("{:?}", session.status)));
@@ -129,11 +129,10 @@ async fn inspector_upsert_session(
};
match inspector
.register(sess_node_id.clone(), &parent_id, meta, None)
.register_node(sess_node_id.clone(), &parent_id, meta)
.await
{
Ok(mut handle) => {
handle.detach();
Ok(()) => {
trace!(connector_id = %connector_id, session_id = %session.id, "Registered session with inspector");
}
Err(_) => {
@@ -165,12 +164,12 @@ async fn inspector_upsert_session(
/// Update only the inspector state for a session (lightweight, no property changes).
#[cfg(feature = "server")]
async fn inspector_update_session_state(
inspector: &Arc<dirigent_inspector::InspectorRegistry>,
inspector: &Arc<dyn crate::traits::ConnectorInspector>,
connector_id: &str,
session_id: &str,
state: dirigent_inspector::NodeState,
state: dirigent_protocol::inspector::NodeState,
) {
let sess_node_id = dirigent_inspector::NodeId::new(format!(
let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session_id
));
@@ -180,13 +179,13 @@ async fn inspector_update_session_state(
/// Deregister all session nodes for a connector from the inspector.
#[cfg(feature = "server")]
async fn inspector_deregister_all_sessions(
inspector: &Arc<dirigent_inspector::InspectorRegistry>,
inspector: &Arc<dyn crate::traits::ConnectorInspector>,
connector_id: &str,
internal_state: &InternalState,
) {
let sessions = internal_state.list_sessions().await;
for session in sessions {
let sess_node_id = dirigent_inspector::NodeId::new(format!(
let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session.id
));
@@ -277,7 +276,7 @@ pub struct AcpConnector {
/// Optional inspector registry for PID tracking of stdio processes
#[cfg(feature = "server")]
inspector: Option<Arc<dirigent_inspector::InspectorRegistry>>,
inspector: Option<Arc<dyn crate::traits::ConnectorInspector>>,
/// Optional process group manager for lifecycle management of stdio processes.
///
@@ -286,7 +285,7 @@ pub struct AcpConnector {
/// tracked in the platform job object / process group, and are shut down
/// gracefully on close.
#[cfg(feature = "server")]
process_manager: Option<Arc<dyn dirigent_process::ProcessGroupManager>>,
process_manager: Option<Arc<dyn crate::traits::ProcessGroupManager>>,
}
impl AcpConnector {
@@ -388,7 +387,7 @@ impl AcpConnector {
#[cfg(feature = "server")]
pub fn with_inspector(
mut self,
inspector: Option<Arc<dirigent_inspector::InspectorRegistry>>,
inspector: Option<Arc<dyn crate::traits::ConnectorInspector>>,
) -> Self {
self.inspector = inspector;
self
@@ -402,7 +401,7 @@ impl AcpConnector {
#[cfg(feature = "server")]
pub fn with_process_manager(
mut self,
process_manager: Option<Arc<dyn dirigent_process::ProcessGroupManager>>,
process_manager: Option<Arc<dyn crate::traits::ProcessGroupManager>>,
) -> Self {
self.process_manager = process_manager;
self
@@ -504,8 +503,8 @@ impl AcpConnector {
pending_agent_requests: Arc<Mutex<HashSet<String>>>,
session_states: Arc<Mutex<HashMap<String, SessionState>>>,
mut cmd_rx: mpsc::Receiver<ConnectorCommand>,
#[cfg(feature = "server")] inspector: Option<Arc<dirigent_inspector::InspectorRegistry>>,
#[cfg(feature = "server")] process_manager: Option<Arc<dyn dirigent_process::ProcessGroupManager>>,
#[cfg(feature = "server")] inspector: Option<Arc<dyn crate::traits::ConnectorInspector>>,
#[cfg(feature = "server")] process_manager: Option<Arc<dyn crate::traits::ProcessGroupManager>>,
) {
debug_log!("🚀 ACP connector {} task started", id);
info!(connector_id = %id, "ACP connector task started");
@@ -690,26 +689,25 @@ impl AcpConnector {
#[cfg(feature = "server")]
if let Some(ref inspector) = inspector {
if let Some(pid) = transport.pid().await {
let process_node_id = dirigent_inspector::NodeId::new(format!(
let process_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/process",
id
));
let parent_node_id = dirigent_inspector::NodeId::new(format!(
let parent_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}",
id
));
let meta = dirigent_inspector::NodeMetadata::new(
dirigent_inspector::NodeKind::Process,
let meta = dirigent_protocol::inspector::NodeMetadata::new(
dirigent_protocol::inspector::NodeKind::Process,
"stdio-process",
)
.with_state(dirigent_inspector::NodeState::Running)
.with_state(dirigent_protocol::inspector::NodeState::Running)
.with_property("pid", serde_json::json!(pid))
.with_property("transport", serde_json::json!("stdio"));
if let Ok(mut handle) = inspector
.register(process_node_id, &parent_node_id, meta, None)
if let Ok(()) = inspector
.register_node(process_node_id, &parent_node_id, meta)
.await
{
handle.detach();
info!(connector_id = %id, pid = pid, "Registered stdio process with inspector");
}
}
@@ -1560,7 +1558,7 @@ impl AcpConnector {
inspector,
&id,
&session_id,
dirigent_inspector::NodeState::Busy("Generating".to_string()),
dirigent_protocol::inspector::NodeState::Busy("Generating".to_string()),
).await;
}
@@ -2433,7 +2431,7 @@ impl AcpConnector {
inspector,
&id,
session_id,
dirigent_inspector::NodeState::Idle,
dirigent_protocol::inspector::NodeState::Idle,
).await;
}
}
@@ -2453,7 +2451,7 @@ impl AcpConnector {
/// Create transport based on configuration
async fn create_transport(
config: &AcpConfig,
#[cfg(feature = "server")] process_manager: Option<&Arc<dyn dirigent_process::ProcessGroupManager>>,
#[cfg(feature = "server")] process_manager: Option<&Arc<dyn crate::traits::ProcessGroupManager>>,
) -> AcpResult<Box<dyn AcpTransport>> {
match &config.transport {
TransportKind::Stdio {
@@ -179,7 +179,7 @@ pub struct StdioTransport {
/// force-killing the process. Without it, the original hard-kill behavior
/// is preserved.
#[cfg(feature = "server")]
process_lifecycle: Option<Box<dyn dirigent_process::ProcessLifecycle>>,
process_lifecycle: Option<Box<dyn crate::traits::ProcessLifecycle>>,
}
impl StdioTransport {
@@ -307,7 +307,7 @@ impl StdioTransport {
///
/// Must be called before `connect()`.
#[cfg(feature = "server")]
pub fn set_process_lifecycle(&mut self, lifecycle: Box<dyn dirigent_process::ProcessLifecycle>) {
pub fn set_process_lifecycle(&mut self, lifecycle: Box<dyn crate::traits::ProcessLifecycle>) {
self.process_lifecycle = Some(lifecycle);
}
@@ -837,7 +837,7 @@ impl AcpTransport for StdioTransport {
if let Some(ref lifecycle) = self.process_lifecycle {
if child.id().is_some() {
// Graceful shutdown: SIGTERM/CTRL_BREAK → wait → force kill
dirigent_process::graceful_shutdown_async(
crate::traits::graceful_shutdown_async(
lifecycle.as_ref(),
&mut child,
std::time::Duration::from_secs(5),
@@ -292,32 +292,32 @@ impl Default for GatewayConfig {
/// Register a Gateway session node in the inspector registry.
#[cfg(feature = "server")]
async fn inspector_register_gateway_session(
inspector: &Arc<dirigent_inspector::InspectorRegistry>,
inspector: &Arc<dyn crate::traits::ConnectorInspector>,
connector_id: &str,
session: &GatewaySession,
) {
let sess_node_id = dirigent_inspector::NodeId::new(format!(
let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session.id
));
let parent_id =
dirigent_inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
dirigent_protocol::inspector::NodeId::new(format!("dirigent/connectors/{}", connector_id));
let meta = dirigent_inspector::NodeMetadata::new(
dirigent_inspector::NodeKind::AsyncTask,
let meta = dirigent_protocol::inspector::NodeMetadata::new(
dirigent_protocol::inspector::NodeKind::AsyncTask,
&session.title,
)
.with_state(dirigent_inspector::NodeState::Running)
.with_state(dirigent_protocol::inspector::NodeState::Running)
.with_property("session_id", serde_json::json!(&session.id))
.with_property("status", serde_json::json!("Active"))
.with_property("message_count", serde_json::json!(session.messages.len()));
match inspector
.register(sess_node_id, &parent_id, meta, None)
.register_node(sess_node_id, &parent_id, meta)
.await
{
Ok(mut handle) => {
handle.detach();
Ok(()) => {
// Registered successfully
}
Err(_) => {
// Already registered — that's fine
@@ -328,12 +328,12 @@ async fn inspector_register_gateway_session(
/// Deregister all Gateway session nodes from the inspector.
#[cfg(feature = "server")]
async fn inspector_deregister_gateway_sessions(
inspector: &Arc<dirigent_inspector::InspectorRegistry>,
inspector: &Arc<dyn crate::traits::ConnectorInspector>,
connector_id: &str,
sessions: &HashMap<String, GatewaySession>,
) {
for session_id in sessions.keys() {
let sess_node_id = dirigent_inspector::NodeId::new(format!(
let sess_node_id = dirigent_protocol::inspector::NodeId::new(format!(
"dirigent/connectors/{}/sessions/{}",
connector_id, session_id
));
@@ -387,7 +387,7 @@ pub struct GatewayConnector {
/// Optional inspector registry for session tracking
#[cfg(feature = "server")]
inspector: Option<Arc<dirigent_inspector::InspectorRegistry>>,
inspector: Option<Arc<dyn crate::traits::ConnectorInspector>>,
}
/// Helper that publishes an event to both the per-connector broadcast
@@ -471,7 +471,7 @@ impl GatewayConnector {
/// Set the inspector registry for session tracking.
#[cfg(feature = "server")]
pub fn set_inspector(&mut self, inspector: Option<Arc<dirigent_inspector::InspectorRegistry>>) {
pub fn set_inspector(&mut self, inspector: Option<Arc<dyn crate::traits::ConnectorInspector>>) {
self.inspector = inspector;
}
@@ -553,7 +553,7 @@ impl GatewayConnector {
mut cmd_rx: mpsc::Receiver<ConnectorCommand>,
connector_list_callback: Option<ConnectorListCallback>,
session_transfer_callback: Option<SessionTransferCallback>,
#[cfg(feature = "server")] inspector: Option<Arc<dirigent_inspector::InspectorRegistry>>,
#[cfg(feature = "server")] inspector: Option<Arc<dyn crate::traits::ConnectorInspector>>,
) {
info!(connector_id = %id, "Gateway connector task started");
+2 -2
View File
@@ -26,14 +26,14 @@ pub trait ConnectorLifecycleHooks: Send + Sync {
async fn on_connector_removed(&self, _connector_id: &str) {}
#[cfg(feature = "server")]
fn inspector(&self) -> Option<std::sync::Arc<dirigent_inspector::InspectorRegistry>> {
fn inspector(&self) -> Option<std::sync::Arc<dyn crate::traits::ConnectorInspector>> {
None
}
#[cfg(feature = "server")]
fn process_manager(
&self,
) -> Option<std::sync::Arc<dyn dirigent_process::ProcessGroupManager>> {
) -> Option<std::sync::Arc<dyn crate::traits::ProcessGroupManager>> {
None
}
}
+4
View File
@@ -74,6 +74,10 @@ pub mod connectors;
#[cfg(feature = "server")]
pub mod vendors;
// Abstract traits for connector-injected services (server-only)
#[cfg(feature = "server")]
pub mod traits;
// ACP module - Agent-Client Protocol implementation (server-only)
#[cfg(feature = "server")]
pub mod acp;
@@ -0,0 +1,38 @@
use dirigent_protocol::inspector::{NodeId, NodeMetadata, NodeState};
use std::collections::HashMap;
/// Abstract interface for registering and updating nodes in the inspector tree.
///
/// Connectors use this trait to expose their internal process hierarchy
/// (child processes, services, async tasks) without depending on the
/// concrete `dirigent_inspector` crate.
#[async_trait::async_trait]
pub trait ConnectorInspector: Send + Sync {
/// Register a new node under `parent` with the given metadata.
async fn register_node(
&self,
id: NodeId,
parent: &NodeId,
metadata: NodeMetadata,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
/// Remove `id` and every node below it from the tree.
async fn deregister_subtree(
&self,
id: &NodeId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
/// Update the runtime state of an existing node.
async fn update_state(
&self,
id: &NodeId,
state: NodeState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
/// Merge additional properties into an existing node's metadata.
async fn update_properties(
&self,
id: &NodeId,
props: HashMap<String, serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
+5
View File
@@ -0,0 +1,5 @@
mod inspector;
mod process;
pub use inspector::ConnectorInspector;
pub use process::{graceful_shutdown_async, ProcessGroupManager, ProcessLifecycle};
@@ -0,0 +1,62 @@
use std::io;
/// Factory for creating platform-specific process lifecycle handlers.
///
/// Each connector receives a `ProcessGroupManager` at construction time and
/// calls `create_lifecycle()` to obtain a handler scoped to a single child
/// process (or process group).
pub trait ProcessGroupManager: Send + Sync {
/// Create a fresh lifecycle handler for a new child process.
fn create_lifecycle(&self) -> Box<dyn ProcessLifecycle>;
}
/// Platform-specific process lifecycle operations.
///
/// Implementations handle the OS-level details of job objects (Windows),
/// process groups (Unix), and signal delivery so that connector code
/// remains platform-agnostic.
pub trait ProcessLifecycle: Send + Sync {
/// Configure a `tokio::process::Command` before spawning
/// (e.g., assign to a job object or set `setsid`).
fn configure_async_command(&self, cmd: &mut tokio::process::Command);
/// Register a spawned child by PID for later signal delivery.
fn register_child(&self, pid: u32) -> Result<(), io::Error>;
/// Send a graceful shutdown signal (SIGTERM on Unix, CTRL_BREAK on Windows).
fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error>;
/// Force-kill the process (SIGKILL on Unix, TerminateProcess on Windows).
fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error>;
}
/// Attempt a graceful shutdown of `child`, falling back to a forced kill
/// after `timeout` elapses.
///
/// Returns `true` if the process exited within the timeout (or was already
/// gone), `false` if a forced kill was required.
pub async fn graceful_shutdown_async(
lifecycle: &dyn ProcessLifecycle,
child: &mut tokio::process::Child,
timeout: std::time::Duration,
) -> bool {
let pid = match child.id() {
Some(0) | None => return true,
Some(pid) => pid,
};
if lifecycle.send_shutdown_signal(pid).is_err() {
return true;
}
match tokio::time::timeout(timeout, child.wait()).await {
Ok(Ok(_)) => true,
Ok(Err(_)) => true,
Err(_) => {
tracing::debug!(pid, "Graceful shutdown timed out, force killing");
let _ = lifecycle.send_kill_signal(pid);
let _ = child.wait().await;
false
}
}
}
-36
View File
@@ -1,36 +0,0 @@
[package]
name = "dirigent_inspector"
version = "0.1.0"
edition = "2021"
[lib]
path = "src/lib.rs"
[dependencies]
# Async traits
async-trait = "0.1"
# Date/time handling
chrono = { version = "0.4", features = ["serde"] }
# Protocol types (canonical node types)
dirigent_protocol = { path = "../dirigent_protocol" }
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Cross-platform process/system metrics
sysinfo = "0.33"
# Error handling
thiserror = "2.0"
# Async runtime
tokio = { version = "1.42", features = ["sync", "time", "rt", "macros"] }
# Logging
tracing = "0.1"
[dev-dependencies]
tokio = { version = "1.42", features = ["full"] }
-349
View File
@@ -1,349 +0,0 @@
use crate::error::{InspectorError, Result};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
/// A command that can be sent to a node.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NodeCommand {
/// Unique command ID for correlation with responses.
pub id: String,
/// What kind of command this is.
pub kind: CommandKind,
/// Arbitrary payload data.
pub payload: serde_json::Value,
}
/// The type of command being sent to a node.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum CommandKind {
/// Request the node to report its internal state (introspective).
Introspect,
/// Execute a named operation.
Execute(String),
/// Custom extension command.
Custom(String),
}
/// Response from a node to a command.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommandResponse {
/// The command ID this is responding to.
pub command_id: String,
/// Whether the command was handled successfully.
pub success: bool,
/// Response data.
pub data: serde_json::Value,
}
impl CommandResponse {
/// Create a success response.
pub fn ok(command_id: impl Into<String>, data: serde_json::Value) -> Self {
Self {
command_id: command_id.into(),
success: true,
data,
}
}
/// Create an error response.
pub fn err(command_id: impl Into<String>, message: impl Into<String>) -> Self {
Self {
command_id: command_id.into(),
success: false,
data: serde_json::json!({ "error": message.into() }),
}
}
}
type CommandPayload = (NodeCommand, oneshot::Sender<CommandResponse>);
/// Create a new inspector channel pair.
///
/// Returns `(sender, receiver)` where:
/// - The **sender** is used by callers (UI, API) to send commands to the node.
/// - The **receiver** is used by the node's loop to receive and respond to commands.
///
/// `capacity` controls the bounded channel size.
pub fn channel(capacity: usize) -> (InspectorChannelSender, InspectorChannelReceiver) {
let (tx, rx) = mpsc::channel(capacity);
let pending = Arc::new(AtomicUsize::new(0));
(
InspectorChannelSender {
tx,
pending: Arc::clone(&pending),
},
InspectorChannelReceiver { rx, pending },
)
}
/// Caller-side of the inspector channel: send commands, check queue depth.
pub struct InspectorChannelSender {
tx: mpsc::Sender<CommandPayload>,
pending: Arc<AtomicUsize>,
}
impl InspectorChannelSender {
/// Send a command and wait for the response.
pub async fn send(&self, cmd: NodeCommand) -> Result<CommandResponse> {
let (resp_tx, resp_rx) = oneshot::channel();
self.pending.fetch_add(1, Ordering::Relaxed);
self.tx
.send((cmd, resp_tx))
.await
.map_err(|_| InspectorError::ChannelClosed)?;
resp_rx.await.map_err(|_| InspectorError::ChannelClosed)
}
/// Try to send a command without waiting, returning a receiver for the response.
///
/// This is useful when you want to fire off a command and collect the
/// response later, or in a `select!` branch.
pub fn try_send(&self, cmd: NodeCommand) -> Result<oneshot::Receiver<CommandResponse>> {
let (resp_tx, resp_rx) = oneshot::channel();
self.pending.fetch_add(1, Ordering::Relaxed);
self.tx.try_send((cmd, resp_tx)).map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => {
self.pending.fetch_sub(1, Ordering::Relaxed);
InspectorError::ChannelFull
}
mpsc::error::TrySendError::Closed(_) => {
self.pending.fetch_sub(1, Ordering::Relaxed);
InspectorError::ChannelClosed
}
})?;
Ok(resp_rx)
}
/// Number of commands currently in the queue (approximate).
pub fn pending_count(&self) -> usize {
self.pending.load(Ordering::Relaxed)
}
}
impl Clone for InspectorChannelSender {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
pending: Arc::clone(&self.pending),
}
}
}
/// Node-side of the inspector channel: receive commands, send responses.
pub struct InspectorChannelReceiver {
rx: mpsc::Receiver<CommandPayload>,
pending: Arc<AtomicUsize>,
}
impl InspectorChannelReceiver {
/// Receive the next command. Returns `None` when all senders are dropped.
///
/// The returned `oneshot::Sender` must be used to send back a response.
pub async fn recv(&mut self) -> Option<(NodeCommand, oneshot::Sender<CommandResponse>)> {
let result = self.rx.recv().await;
if result.is_some() {
self.pending.fetch_sub(1, Ordering::Relaxed);
}
result
}
/// Try to receive without blocking. Returns `None` if the channel is empty.
pub fn try_recv(&mut self) -> Option<(NodeCommand, oneshot::Sender<CommandResponse>)> {
match self.rx.try_recv() {
Ok(payload) => {
self.pending.fetch_sub(1, Ordering::Relaxed);
Some(payload)
}
Err(_) => None,
}
}
/// Number of commands currently in the queue (approximate).
pub fn pending_count(&self) -> usize {
self.pending.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn introspect_cmd(id: &str) -> NodeCommand {
NodeCommand {
id: id.to_string(),
kind: CommandKind::Introspect,
payload: serde_json::Value::Null,
}
}
fn exec_cmd(id: &str, name: &str, payload: serde_json::Value) -> NodeCommand {
NodeCommand {
id: id.to_string(),
kind: CommandKind::Execute(name.to_string()),
payload,
}
}
#[tokio::test]
async fn test_send_recv_response() {
let (sender, mut receiver) = channel(10);
// Simulate a node loop in a background task
let node_task = tokio::spawn(async move {
if let Some((cmd, resp_tx)) = receiver.recv().await {
assert_eq!(cmd.id, "cmd-1");
assert!(matches!(cmd.kind, CommandKind::Introspect));
let _ = resp_tx.send(CommandResponse::ok(
&cmd.id,
serde_json::json!({ "queue_len": 5 }),
));
}
});
let response = sender.send(introspect_cmd("cmd-1")).await.unwrap();
assert!(response.success);
assert_eq!(response.command_id, "cmd-1");
assert_eq!(response.data["queue_len"], 5);
node_task.await.unwrap();
}
#[tokio::test]
async fn test_try_send() {
let (sender, mut receiver) = channel(10);
let resp_rx = sender
.try_send(exec_cmd("cmd-2", "restart", serde_json::json!({})))
.unwrap();
// Node responds
let (cmd, resp_tx) = receiver.recv().await.unwrap();
assert_eq!(cmd.id, "cmd-2");
let _ = resp_tx.send(CommandResponse::ok(&cmd.id, serde_json::json!("restarted")));
let response = resp_rx.await.unwrap();
assert!(response.success);
}
#[tokio::test]
async fn test_pending_count() {
let (sender, mut receiver) = channel(10);
assert_eq!(sender.pending_count(), 0);
// Send 3 commands without receiving
let _r1 = sender.try_send(introspect_cmd("a")).unwrap();
let _r2 = sender.try_send(introspect_cmd("b")).unwrap();
let _r3 = sender.try_send(introspect_cmd("c")).unwrap();
assert_eq!(sender.pending_count(), 3);
assert_eq!(receiver.pending_count(), 3);
// Receive one
let (cmd, resp_tx) = receiver.recv().await.unwrap();
let _ = resp_tx.send(CommandResponse::ok(&cmd.id, serde_json::Value::Null));
assert_eq!(receiver.pending_count(), 2);
}
#[tokio::test]
async fn test_try_send_full() {
let (sender, _receiver) = channel(1);
// Fill the channel
let _r1 = sender.try_send(introspect_cmd("a")).unwrap();
// Should fail with ChannelFull
let result = sender.try_send(introspect_cmd("b"));
assert!(matches!(result, Err(InspectorError::ChannelFull)));
}
#[tokio::test]
async fn test_send_after_receiver_dropped() {
let (sender, receiver) = channel(10);
drop(receiver);
let result = sender.send(introspect_cmd("orphan")).await;
assert!(matches!(result, Err(InspectorError::ChannelClosed)));
}
#[tokio::test]
async fn test_recv_after_sender_dropped() {
let (sender, mut receiver) = channel(10);
drop(sender);
let result = receiver.recv().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_try_recv_empty() {
let (_sender, mut receiver) = channel(10);
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_error_response() {
let (sender, mut receiver) = channel(10);
let node_task = tokio::spawn(async move {
if let Some((cmd, resp_tx)) = receiver.recv().await {
let _ = resp_tx.send(CommandResponse::err(&cmd.id, "not supported"));
}
});
let response = sender
.send(exec_cmd("cmd-x", "unknown", serde_json::Value::Null))
.await
.unwrap();
assert!(!response.success);
assert_eq!(response.data["error"], "not supported");
node_task.await.unwrap();
}
#[tokio::test]
async fn test_multiple_senders() {
let (sender, mut receiver) = channel(10);
let sender2 = sender.clone();
let _r1 = sender.try_send(introspect_cmd("from-1")).unwrap();
let _r2 = sender2.try_send(introspect_cmd("from-2")).unwrap();
let (cmd1, resp_tx1) = receiver.recv().await.unwrap();
let _ = resp_tx1.send(CommandResponse::ok(&cmd1.id, serde_json::Value::Null));
let (cmd2, resp_tx2) = receiver.recv().await.unwrap();
let _ = resp_tx2.send(CommandResponse::ok(&cmd2.id, serde_json::Value::Null));
// Both commands were received (order may vary since mpsc is FIFO)
let ids = vec![cmd1.id, cmd2.id];
assert!(ids.contains(&"from-1".to_string()));
assert!(ids.contains(&"from-2".to_string()));
}
#[test]
fn test_command_serialization() {
let cmd = exec_cmd("cmd-1", "restart", serde_json::json!({"force": true}));
let json = serde_json::to_string(&cmd).unwrap();
let deserialized: NodeCommand = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.id, "cmd-1");
assert!(matches!(deserialized.kind, CommandKind::Execute(ref name) if name == "restart"));
}
#[test]
fn test_response_serialization() {
let resp = CommandResponse::ok("cmd-1", serde_json::json!({"status": "done"}));
let json = serde_json::to_string(&resp).unwrap();
let deserialized: CommandResponse = serde_json::from_str(&json).unwrap();
assert!(deserialized.success);
assert_eq!(deserialized.data["status"], "done");
}
}
-34
View File
@@ -1,34 +0,0 @@
use crate::node::NodeId;
/// Errors that can occur in the inspector system.
#[derive(Debug, thiserror::Error)]
pub enum InspectorError {
#[error("node not found: {0}")]
NodeNotFound(NodeId),
#[error("node already exists: {0}")]
NodeAlreadyExists(NodeId),
#[error("parent node not found: {0}")]
ParentNotFound(NodeId),
#[error("cannot remove root node")]
CannotRemoveRoot,
#[error("channel closed")]
ChannelClosed,
#[error("channel full")]
ChannelFull,
#[error("command timed out")]
CommandTimeout,
#[error("process not found: pid {0}")]
ProcessNotFound(u32),
#[error("{0}")]
Internal(String),
}
pub type Result<T> = std::result::Result<T, InspectorError>;
-262
View File
@@ -1,262 +0,0 @@
use crate::error::Result;
use crate::node::{Inspectable, NodeId, NodeMetadata, NodeState};
use crate::registry::InspectorRegistry;
use std::collections::HashMap;
use std::sync::Arc;
/// Handle to a registered node in the inspector tree.
///
/// Returned when a node is registered. The producer (connector, service, etc.)
/// uses this handle to update their node's state and properties without needing
/// direct access to the registry.
///
/// When the handle is dropped, the node is automatically deregistered (best-effort).
/// To keep the node alive without the handle, call `detach()` before dropping.
pub struct NodeHandle {
id: NodeId,
registry: Arc<InspectorRegistry>,
detached: bool,
}
impl NodeHandle {
pub(crate) fn new(id: NodeId, registry: Arc<InspectorRegistry>) -> Self {
Self {
id,
registry,
detached: false,
}
}
/// Get this node's ID.
pub fn id(&self) -> &NodeId {
&self.id
}
/// Update this node's lifecycle state.
pub async fn set_state(&self, state: NodeState) -> Result<()> {
self.registry.update_state(&self.id, state).await
}
/// Set a single property on this node.
pub async fn set_property(&self, key: &str, value: serde_json::Value) -> Result<()> {
let mut props = HashMap::new();
props.insert(key.to_string(), value);
self.registry.update_properties(&self.id, props).await
}
/// Set multiple properties on this node.
pub async fn set_properties(&self, props: HashMap<String, serde_json::Value>) -> Result<()> {
self.registry.update_properties(&self.id, props).await
}
/// Register a child node under this node.
///
/// Returns a new `NodeHandle` for the child.
pub async fn register_child(
&self,
child_id: NodeId,
metadata: NodeMetadata,
inspectable: Option<Arc<dyn Inspectable>>,
) -> Result<NodeHandle> {
self.registry
.register(child_id, &self.id, metadata, inspectable)
.await
}
/// Explicitly deregister this node and consume the handle.
pub async fn deregister(mut self) -> Result<()> {
self.detached = true; // prevent Drop from double-deregistering
self.registry.deregister(&self.id).await
}
/// Detach this handle so the node survives when the handle is dropped.
///
/// After calling this, dropping the handle will NOT deregister the node.
/// The node can still be removed via `InspectorRegistry::deregister()`.
pub fn detach(&mut self) {
self.detached = true;
}
/// Get a reference to the registry this handle is connected to.
pub fn registry(&self) -> &Arc<InspectorRegistry> {
&self.registry
}
}
impl Drop for NodeHandle {
fn drop(&mut self) {
if !self.detached {
let id = self.id.clone();
let registry = Arc::clone(&self.registry);
// Best-effort async deregister from a sync Drop context
tokio::spawn(async move {
let _ = registry.deregister(&id).await;
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::node::{NodeKind, NodeMetadata, NodeState};
#[tokio::test]
async fn test_handle_set_state() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let handle = registry
.register(
NodeId::new("dirigent/test"),
&root,
NodeMetadata::new(NodeKind::Service, "Test"),
None,
)
.await
.unwrap();
handle.set_state(NodeState::Running).await.unwrap();
let meta = registry
.get_node(&NodeId::new("dirigent/test"))
.await
.unwrap();
assert_eq!(meta.state, NodeState::Running);
}
#[tokio::test]
async fn test_handle_set_property() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let handle = registry
.register(
NodeId::new("dirigent/proc"),
&root,
NodeMetadata::new(NodeKind::Process, "Proc"),
None,
)
.await
.unwrap();
handle
.set_property("pid", serde_json::json!(9999))
.await
.unwrap();
let meta = registry
.get_node(&NodeId::new("dirigent/proc"))
.await
.unwrap();
assert_eq!(meta.properties["pid"], serde_json::json!(9999));
}
#[tokio::test]
async fn test_handle_register_child() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let parent_handle = registry
.register(
NodeId::new("dirigent/parent"),
&root,
NodeMetadata::new(NodeKind::Connector, "Parent"),
None,
)
.await
.unwrap();
let child_handle = parent_handle
.register_child(
NodeId::new("dirigent/parent/child"),
NodeMetadata::new(NodeKind::Process, "Child"),
None,
)
.await
.unwrap();
assert_eq!(child_handle.id().as_str(), "dirigent/parent/child");
assert!(
registry
.contains(&NodeId::new("dirigent/parent/child"))
.await
);
}
#[tokio::test]
async fn test_handle_deregister() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let handle = registry
.register(
NodeId::new("dirigent/temp"),
&root,
NodeMetadata::new(NodeKind::AsyncTask, "Temp"),
None,
)
.await
.unwrap();
assert!(registry.contains(&NodeId::new("dirigent/temp")).await);
handle.deregister().await.unwrap();
assert!(!registry.contains(&NodeId::new("dirigent/temp")).await);
}
#[tokio::test]
async fn test_handle_drop_deregisters() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
{
let _handle = registry
.register(
NodeId::new("dirigent/ephemeral"),
&root,
NodeMetadata::new(NodeKind::AsyncTask, "Ephemeral"),
None,
)
.await
.unwrap();
// handle dropped here
}
// Give the spawned deregister task a moment to run
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(
!registry.contains(&NodeId::new("dirigent/ephemeral")).await,
"Node should be deregistered after handle drop"
);
}
#[tokio::test]
async fn test_handle_detach_survives_drop() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
{
let mut handle = registry
.register(
NodeId::new("dirigent/persistent"),
&root,
NodeMetadata::new(NodeKind::Service, "Persistent"),
None,
)
.await
.unwrap();
handle.detach();
// handle dropped here, but detached
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(
registry.contains(&NodeId::new("dirigent/persistent")).await,
"Detached node should survive handle drop"
);
}
}
-23
View File
@@ -1,23 +0,0 @@
pub mod channel;
pub mod error;
pub mod handle;
pub mod node;
pub mod process;
pub mod registry;
pub mod snapshot;
pub mod system;
pub mod tree;
// Re-export commonly used types
pub use channel::{
channel as inspector_channel, CommandKind, CommandResponse, InspectorChannelReceiver,
InspectorChannelSender, NodeCommand,
};
pub use error::{InspectorError, Result};
pub use handle::NodeHandle;
pub use node::{Inspectable, NodeId, NodeKind, NodeMetadata, NodeState};
pub use process::{ProcessInfo, ProcessMonitor, ProcessStatus};
pub use registry::{InspectorEvent, InspectorRegistry};
pub use snapshot::{NodeSnapshot, TreeSnapshot};
pub use system::{SystemInfo, SystemMonitor};
pub use tree::{NodeTree, TreeNode};
-109
View File
@@ -1,109 +0,0 @@
use async_trait::async_trait;
// Re-export canonical types from dirigent_protocol
pub use dirigent_protocol::inspector::{NodeId, NodeKind, NodeMetadata, NodeState};
/// Trait for components that support rich introspection.
///
/// Implementing this trait allows a component to provide detailed status reports
/// when queried. The `inspect()` method runs in the component's own async context
/// (introspective), while `current_state()` is synchronous for quick polling.
#[async_trait]
pub trait Inspectable: Send + Sync {
/// Return a detailed status report as JSON.
///
/// This is an introspective operation: it runs inside the node's context
/// and can access internal state that isn't part of the standard metadata.
async fn inspect(&self) -> serde_json::Value;
/// Return the current lifecycle state.
///
/// This should be a cheap, synchronous operation returning the node's
/// current state without blocking.
fn current_state(&self) -> NodeState;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_id_child() {
let root = NodeId::new("dirigent");
let child = root.child("connectors");
assert_eq!(child.as_str(), "dirigent/connectors");
let grandchild = child.child("acp-claude");
assert_eq!(grandchild.as_str(), "dirigent/connectors/acp-claude");
}
#[test]
fn test_node_id_parent() {
let id = NodeId::new("dirigent/connectors/acp-claude");
assert_eq!(id.parent().unwrap().as_str(), "dirigent/connectors");
assert_eq!(
id.parent().unwrap().parent().unwrap().as_str(),
"dirigent"
);
assert!(NodeId::new("dirigent").parent().is_none());
}
#[test]
fn test_node_id_name() {
assert_eq!(NodeId::new("dirigent/connectors/acp").name(), "acp");
assert_eq!(NodeId::new("dirigent").name(), "dirigent");
}
#[test]
fn test_node_id_from_str() {
let id: NodeId = "dirigent/system/host".into();
assert_eq!(id.as_str(), "dirigent/system/host");
}
#[test]
fn test_node_metadata_builder() {
let meta = NodeMetadata::new(NodeKind::Connector, "My Connector")
.with_state(NodeState::Running)
.with_property("base_url", serde_json::json!("http://localhost:3000"));
assert_eq!(meta.kind, NodeKind::Connector);
assert_eq!(meta.label, "My Connector");
assert_eq!(meta.state, NodeState::Running);
assert_eq!(
meta.properties.get("base_url").unwrap(),
&serde_json::json!("http://localhost:3000")
);
}
#[test]
fn test_node_metadata_serialization() {
let meta = NodeMetadata::new(NodeKind::Process, "stdio-transport")
.with_state(NodeState::Busy("processing message".into()))
.with_property("pid", serde_json::json!(12345));
let json = serde_json::to_string(&meta).unwrap();
let deserialized: NodeMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.kind, NodeKind::Process);
assert_eq!(deserialized.label, "stdio-transport");
assert_eq!(
deserialized.state,
NodeState::Busy("processing message".into())
);
}
#[test]
fn test_node_state_display() {
assert_eq!(NodeState::Running.to_string(), "Running");
assert_eq!(
NodeState::Error("timeout".into()).to_string(),
"Error(timeout)"
);
}
#[test]
fn test_node_kind_display() {
assert_eq!(NodeKind::Root.to_string(), "Root");
assert_eq!(NodeKind::Custom("agent".into()).to_string(), "Custom(agent)");
}
}
-381
View File
@@ -1,381 +0,0 @@
use crate::node::NodeId;
use crate::registry::InspectorRegistry;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::{Pid, ProcessesToUpdate, System};
use tokio::task::JoinHandle;
use tracing::{debug, warn};
/// Information about a monitored OS process.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub command: Vec<String>,
pub exe: Option<PathBuf>,
pub cwd: Option<PathBuf>,
pub status: ProcessStatus,
pub cpu_usage_percent: f32,
pub memory_bytes: u64,
pub virtual_memory_bytes: u64,
pub start_time_secs: u64,
pub run_time_secs: u64,
pub disk_read_bytes: u64,
pub disk_written_bytes: u64,
}
/// Simplified cross-platform process status.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum ProcessStatus {
Running,
Sleeping,
Stopped,
Zombie,
Dead,
Unknown,
}
impl From<sysinfo::ProcessStatus> for ProcessStatus {
fn from(s: sysinfo::ProcessStatus) -> Self {
match s {
sysinfo::ProcessStatus::Run => ProcessStatus::Running,
sysinfo::ProcessStatus::Sleep | sysinfo::ProcessStatus::Idle => ProcessStatus::Sleeping,
sysinfo::ProcessStatus::Stop | sysinfo::ProcessStatus::Tracing => {
ProcessStatus::Stopped
}
sysinfo::ProcessStatus::Zombie => ProcessStatus::Zombie,
sysinfo::ProcessStatus::Dead | sysinfo::ProcessStatus::UninterruptibleDiskSleep => {
ProcessStatus::Dead
}
_ => ProcessStatus::Unknown,
}
}
}
/// Direction of the last observed I/O activity.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum IoDirection {
Read,
Write,
}
/// Information about the most recent I/O activity.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IoActivity {
pub direction: IoDirection,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
/// Monitors a set of OS processes by PID, providing metrics via `sysinfo`.
///
/// Each tracked PID is associated with a `NodeId` in the inspector tree.
/// The monitor can be polled manually or run as a background task that
/// periodically refreshes and updates the registry.
pub struct ProcessMonitor {
system: System,
tracked: HashMap<u32, NodeId>,
}
impl ProcessMonitor {
/// Create a new process monitor.
pub fn new() -> Self {
Self {
system: System::new(),
tracked: HashMap::new(),
}
}
/// Start tracking a process by PID, associated with the given node ID.
pub fn track(&mut self, pid: u32, node_id: NodeId) {
debug!(pid, node_id = %node_id, "Tracking process");
self.tracked.insert(pid, node_id);
}
/// Stop tracking a process.
pub fn untrack(&mut self, pid: u32) {
debug!(pid, "Untracking process");
self.tracked.remove(&pid);
}
/// Get the set of tracked PIDs and their node IDs.
pub fn tracked_pids(&self) -> &HashMap<u32, NodeId> {
&self.tracked
}
/// Refresh data for all tracked processes and return their info.
pub fn refresh(&mut self) -> HashMap<u32, ProcessInfo> {
// Build list of PIDs to refresh
let pids: Vec<Pid> = self.tracked.keys().map(|&p| Pid::from_u32(p)).collect();
// Refresh only tracked processes
self.system
.refresh_processes(ProcessesToUpdate::Some(&pids), true);
let mut results = HashMap::new();
for (&pid, _node_id) in &self.tracked {
let sysinfo_pid = Pid::from_u32(pid);
if let Some(process) = self.system.process(sysinfo_pid) {
let disk_usage = process.disk_usage();
let info = ProcessInfo {
pid,
name: process.name().to_string_lossy().to_string(),
command: process
.cmd()
.iter()
.map(|s| s.to_string_lossy().to_string())
.collect(),
exe: process.exe().map(|p| p.to_path_buf()),
cwd: process.cwd().map(|p| p.to_path_buf()),
status: ProcessStatus::from(process.status()),
cpu_usage_percent: process.cpu_usage(),
memory_bytes: process.memory(),
virtual_memory_bytes: process.virtual_memory(),
start_time_secs: process.start_time(),
run_time_secs: process.run_time(),
disk_read_bytes: disk_usage.read_bytes,
disk_written_bytes: disk_usage.written_bytes,
};
results.insert(pid, info);
}
}
results
}
/// Get info for a single tracked process.
pub fn get(&mut self, pid: u32) -> Option<ProcessInfo> {
let sysinfo_pid = Pid::from_u32(pid);
self.system
.refresh_processes(ProcessesToUpdate::Some(&[sysinfo_pid]), true);
self.system.process(sysinfo_pid).map(|process| {
let disk_usage = process.disk_usage();
ProcessInfo {
pid,
name: process.name().to_string_lossy().to_string(),
command: process
.cmd()
.iter()
.map(|s| s.to_string_lossy().to_string())
.collect(),
exe: process.exe().map(|p| p.to_path_buf()),
cwd: process.cwd().map(|p| p.to_path_buf()),
status: ProcessStatus::from(process.status()),
cpu_usage_percent: process.cpu_usage(),
memory_bytes: process.memory(),
virtual_memory_bytes: process.virtual_memory(),
start_time_secs: process.start_time(),
run_time_secs: process.run_time(),
disk_read_bytes: disk_usage.read_bytes,
disk_written_bytes: disk_usage.written_bytes,
}
})
}
/// Check if a process is alive (outrospective: queries the OS directly).
pub fn is_alive(&mut self, pid: u32) -> bool {
let sysinfo_pid = Pid::from_u32(pid);
self.system
.refresh_processes(ProcessesToUpdate::Some(&[sysinfo_pid]), true);
self.system.process(sysinfo_pid).is_some()
}
/// Spawn a background task that periodically refreshes tracked processes
/// and updates their nodes in the registry.
///
/// The task runs until the returned `JoinHandle` is aborted or the
/// monitor is dropped.
pub fn start_polling(
mut self,
registry: Arc<InspectorRegistry>,
interval: Duration,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let infos = self.refresh();
for (pid, info) in &infos {
if let Some(node_id) = self.tracked.get(pid) {
let mut props = HashMap::new();
props.insert("pid".to_string(), serde_json::json!(info.pid));
props.insert("name".to_string(), serde_json::json!(info.name));
props.insert("command".to_string(), serde_json::json!(info.command));
props.insert(
"status".to_string(),
serde_json::to_value(&info.status).unwrap_or_default(),
);
props.insert(
"cpu_usage_percent".to_string(),
serde_json::json!(info.cpu_usage_percent),
);
props.insert(
"memory_bytes".to_string(),
serde_json::json!(info.memory_bytes),
);
props.insert(
"virtual_memory_bytes".to_string(),
serde_json::json!(info.virtual_memory_bytes),
);
props.insert(
"run_time_secs".to_string(),
serde_json::json!(info.run_time_secs),
);
props.insert(
"disk_read_bytes".to_string(),
serde_json::json!(info.disk_read_bytes),
);
props.insert(
"disk_written_bytes".to_string(),
serde_json::json!(info.disk_written_bytes),
);
if let Err(e) = registry.update_properties(node_id, props).await {
warn!(
pid,
node_id = %node_id,
error = %e,
"Failed to update process node properties"
);
}
}
}
// Check for dead processes
let dead_pids: Vec<u32> = self
.tracked
.keys()
.filter(|&&pid| !infos.contains_key(&pid))
.copied()
.collect();
for pid in dead_pids {
if let Some(node_id) = self.tracked.get(&pid) {
debug!(pid, node_id = %node_id, "Process no longer alive");
let _ = registry
.update_state(node_id, crate::node::NodeState::Stopped)
.await;
}
}
}
})
}
}
impl Default for ProcessMonitor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_process_status_from_sysinfo() {
assert_eq!(
ProcessStatus::from(sysinfo::ProcessStatus::Run),
ProcessStatus::Running
);
assert_eq!(
ProcessStatus::from(sysinfo::ProcessStatus::Sleep),
ProcessStatus::Sleeping
);
assert_eq!(
ProcessStatus::from(sysinfo::ProcessStatus::Zombie),
ProcessStatus::Zombie
);
assert_eq!(
ProcessStatus::from(sysinfo::ProcessStatus::Stop),
ProcessStatus::Stopped
);
}
#[test]
fn test_process_monitor_track_untrack() {
let mut monitor = ProcessMonitor::new();
let node_id = NodeId::new("dirigent/test/proc");
monitor.track(1234, node_id.clone());
assert!(monitor.tracked_pids().contains_key(&1234));
monitor.untrack(1234);
assert!(!monitor.tracked_pids().contains_key(&1234));
}
#[test]
fn test_process_monitor_refresh_current_process() {
let mut monitor = ProcessMonitor::new();
let current_pid = std::process::id();
let node_id = NodeId::new("dirigent/test/self");
monitor.track(current_pid, node_id);
let infos = monitor.refresh();
assert!(
infos.contains_key(&current_pid),
"Current process should be found"
);
let info = &infos[&current_pid];
assert_eq!(info.pid, current_pid);
assert!(!info.name.is_empty());
assert!(info.memory_bytes > 0);
}
#[test]
fn test_process_monitor_get_current_process() {
let mut monitor = ProcessMonitor::new();
let current_pid = std::process::id();
let info = monitor.get(current_pid);
assert!(info.is_some(), "Should find current process");
let info = info.unwrap();
assert_eq!(info.pid, current_pid);
}
#[test]
fn test_process_monitor_is_alive() {
let mut monitor = ProcessMonitor::new();
// Current process should be alive
assert!(monitor.is_alive(std::process::id()));
// A very high PID should not exist
assert!(!monitor.is_alive(u32::MAX));
}
#[test]
fn test_process_info_serialization() {
let info = ProcessInfo {
pid: 1234,
name: "test".to_string(),
command: vec!["test".to_string(), "--flag".to_string()],
exe: Some(PathBuf::from("/usr/bin/test")),
cwd: Some(PathBuf::from("/home/user")),
status: ProcessStatus::Running,
cpu_usage_percent: 12.5,
memory_bytes: 1024 * 1024,
virtual_memory_bytes: 2048 * 1024,
start_time_secs: 1700000000,
run_time_secs: 3600,
disk_read_bytes: 1024,
disk_written_bytes: 2048,
};
let json = serde_json::to_string(&info).unwrap();
let deserialized: ProcessInfo = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.pid, 1234);
assert_eq!(deserialized.status, ProcessStatus::Running);
assert_eq!(deserialized.memory_bytes, 1024 * 1024);
}
}
-459
View File
@@ -1,459 +0,0 @@
use crate::error::Result;
use crate::node::{Inspectable, NodeId, NodeKind, NodeMetadata, NodeState};
use crate::tree::NodeTree;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
/// Events emitted by the registry when the tree changes.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum InspectorEvent {
NodeRegistered {
id: NodeId,
parent: NodeId,
kind: NodeKind,
},
NodeRemoved {
id: NodeId,
},
StateChanged {
id: NodeId,
old: NodeState,
new: NodeState,
},
PropertiesUpdated {
id: NodeId,
keys: Vec<String>,
},
}
/// Central registry for the inspector tree.
///
/// Thread-safe: all operations acquire the internal `RwLock` as needed.
/// Emits `InspectorEvent`s on a broadcast channel for reactive consumers.
pub struct InspectorRegistry {
tree: Arc<RwLock<NodeTree>>,
event_tx: broadcast::Sender<InspectorEvent>,
}
impl InspectorRegistry {
/// Create a new registry with a root node.
///
/// The root node ID is `"dirigent"` by default, with `NodeKind::Root`.
pub fn new() -> Self {
let root_id = NodeId::new("dirigent");
let root_meta =
NodeMetadata::new(NodeKind::Root, "Dirigent").with_state(NodeState::Running);
let tree = NodeTree::new(root_id, root_meta);
let (event_tx, _) = broadcast::channel(500);
Self {
tree: Arc::new(RwLock::new(tree)),
event_tx,
}
}
/// Create a registry with a custom root node.
pub fn with_root(root_id: NodeId, root_metadata: NodeMetadata) -> Self {
let tree = NodeTree::new(root_id, root_metadata);
let (event_tx, _) = broadcast::channel(500);
Self {
tree: Arc::new(RwLock::new(tree)),
event_tx,
}
}
/// Get the root node ID.
pub async fn root_id(&self) -> NodeId {
let tree = self.tree.read().await;
tree.root_id().clone()
}
/// Register a new node under the given parent.
///
/// Returns a `NodeHandle` that the producer can use to update this node.
pub async fn register(
self: &Arc<Self>,
id: NodeId,
parent: &NodeId,
metadata: NodeMetadata,
inspectable: Option<Arc<dyn Inspectable>>,
) -> Result<crate::handle::NodeHandle> {
let kind = metadata.kind.clone();
let parent_clone = parent.clone();
{
let mut tree = self.tree.write().await;
tree.insert(id.clone(), parent, metadata, inspectable)?;
}
let _ = self.event_tx.send(InspectorEvent::NodeRegistered {
id: id.clone(),
parent: parent_clone,
kind,
});
Ok(crate::handle::NodeHandle::new(id, Arc::clone(self)))
}
/// Deregister a node (and reparent its children to its parent).
pub async fn deregister(&self, id: &NodeId) -> Result<()> {
{
let mut tree = self.tree.write().await;
tree.remove(id)?;
}
let _ = self
.event_tx
.send(InspectorEvent::NodeRemoved { id: id.clone() });
Ok(())
}
/// Deregister a node and all its descendants.
pub async fn deregister_subtree(&self, id: &NodeId) -> Result<()> {
{
let mut tree = self.tree.write().await;
tree.remove_subtree(id)?;
}
let _ = self
.event_tx
.send(InspectorEvent::NodeRemoved { id: id.clone() });
Ok(())
}
/// Update a node's lifecycle state.
pub async fn update_state(&self, id: &NodeId, state: NodeState) -> Result<()> {
let old = {
let mut tree = self.tree.write().await;
tree.update_state(id, state.clone())?
};
if old != state {
let _ = self.event_tx.send(InspectorEvent::StateChanged {
id: id.clone(),
old,
new: state,
});
}
Ok(())
}
/// Update or insert properties on a node.
pub async fn update_properties(
&self,
id: &NodeId,
props: HashMap<String, serde_json::Value>,
) -> Result<()> {
let keys = {
let mut tree = self.tree.write().await;
tree.update_properties(id, props)?
};
if !keys.is_empty() {
let _ = self.event_tx.send(InspectorEvent::PropertiesUpdated {
id: id.clone(),
keys,
});
}
Ok(())
}
/// Get a clone of a node's metadata.
pub async fn get_node(&self, id: &NodeId) -> Option<NodeMetadata> {
let tree = self.tree.read().await;
tree.get(id).map(|n| n.metadata.clone())
}
/// Get metadata for all direct children of a node.
pub async fn get_children(&self, id: &NodeId) -> Vec<(NodeId, NodeMetadata)> {
let tree = self.tree.read().await;
tree.children(id)
.into_iter()
.map(|n| (n.id.clone(), n.metadata.clone()))
.collect()
}
/// Check if a node exists.
pub async fn contains(&self, id: &NodeId) -> bool {
let tree = self.tree.read().await;
tree.contains(id)
}
/// Get the total number of nodes.
pub async fn node_count(&self) -> usize {
let tree = self.tree.read().await;
tree.len()
}
/// Call `Inspectable::inspect()` on a node, if it implements the trait.
///
/// Returns `None` if the node doesn't exist or doesn't have an `Inspectable` impl.
pub async fn inspect_node(&self, id: &NodeId) -> Option<serde_json::Value> {
let inspectable = {
let tree = self.tree.read().await;
tree.get(id)
.and_then(|n| n.inspectable.as_ref().map(Arc::clone))
};
match inspectable {
Some(i) => Some(i.inspect().await),
None => None,
}
}
/// Subscribe to tree change events.
pub fn subscribe(&self) -> broadcast::Receiver<InspectorEvent> {
self.event_tx.subscribe()
}
/// Get a snapshot of the entire tree (see snapshot module).
pub async fn snapshot(&self) -> crate::snapshot::TreeSnapshot {
let tree = self.tree.read().await;
crate::snapshot::TreeSnapshot::from_tree(&tree)
}
}
impl Default for InspectorRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_registry_new_has_root() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
assert_eq!(root.as_str(), "dirigent");
assert!(registry.contains(&root).await);
assert_eq!(registry.node_count().await, 1);
}
#[tokio::test]
async fn test_register_and_get() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let id = NodeId::new("dirigent/connectors");
let meta = NodeMetadata::new(NodeKind::Connector, "Connectors");
let handle = registry
.register(id.clone(), &root, meta, None)
.await
.unwrap();
assert_eq!(handle.id().as_str(), "dirigent/connectors");
let retrieved = registry.get_node(&id).await.unwrap();
assert_eq!(retrieved.label, "Connectors");
assert_eq!(registry.node_count().await, 2);
}
#[tokio::test]
async fn test_register_emits_event() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let mut rx = registry.subscribe();
let id = NodeId::new("dirigent/test");
let meta = NodeMetadata::new(NodeKind::Service, "Test");
let _handle = registry
.register(id.clone(), &root, meta, None)
.await
.unwrap();
let event = rx.recv().await.unwrap();
match event {
InspectorEvent::NodeRegistered {
id: event_id,
parent,
kind,
} => {
assert_eq!(event_id.as_str(), "dirigent/test");
assert_eq!(parent.as_str(), "dirigent");
assert_eq!(kind, NodeKind::Service);
}
_ => panic!("Expected NodeRegistered event"),
}
}
#[tokio::test]
async fn test_deregister() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let id = NodeId::new("dirigent/temp");
let meta = NodeMetadata::new(NodeKind::AsyncTask, "Temp");
let _handle = registry
.register(id.clone(), &root, meta, None)
.await
.unwrap();
assert!(registry.contains(&id).await);
registry.deregister(&id).await.unwrap();
assert!(!registry.contains(&id).await);
}
#[tokio::test]
async fn test_update_state_emits_event() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let id = NodeId::new("dirigent/svc");
let meta = NodeMetadata::new(NodeKind::Service, "Service");
let _handle = registry
.register(id.clone(), &root, meta, None)
.await
.unwrap();
let mut rx = registry.subscribe();
registry
.update_state(&id, NodeState::Error("crash".into()))
.await
.unwrap();
let event = rx.recv().await.unwrap();
match event {
InspectorEvent::StateChanged { id: eid, old, new } => {
assert_eq!(eid.as_str(), "dirigent/svc");
assert_eq!(old, NodeState::Initializing);
assert_eq!(new, NodeState::Error("crash".into()));
}
_ => panic!("Expected StateChanged event"),
}
}
#[tokio::test]
async fn test_no_event_on_same_state() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let id = NodeId::new("dirigent/svc");
let meta = NodeMetadata::new(NodeKind::Service, "Service").with_state(NodeState::Running);
let _handle = registry
.register(id.clone(), &root, meta, None)
.await
.unwrap();
let mut rx = registry.subscribe();
// Update to the same state
registry
.update_state(&id, NodeState::Running)
.await
.unwrap();
// Should not receive a StateChanged event
let result = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
assert!(result.is_err(), "Should not emit event for same state");
}
#[tokio::test]
async fn test_update_properties() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let id = NodeId::new("dirigent/proc");
let meta = NodeMetadata::new(NodeKind::Process, "Process");
let _handle = registry
.register(id.clone(), &root, meta, None)
.await
.unwrap();
let mut props = HashMap::new();
props.insert("pid".to_string(), serde_json::json!(1234));
props.insert("cpu".to_string(), serde_json::json!(50.0));
registry.update_properties(&id, props).await.unwrap();
let node = registry.get_node(&id).await.unwrap();
assert_eq!(node.properties["pid"], serde_json::json!(1234));
assert_eq!(node.properties["cpu"], serde_json::json!(50.0));
}
#[tokio::test]
async fn test_get_children() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let _h1 = registry
.register(
NodeId::new("dirigent/a"),
&root,
NodeMetadata::new(NodeKind::Service, "A"),
None,
)
.await
.unwrap();
let _h2 = registry
.register(
NodeId::new("dirigent/b"),
&root,
NodeMetadata::new(NodeKind::Service, "B"),
None,
)
.await
.unwrap();
let children = registry.get_children(&root).await;
assert_eq!(children.len(), 2);
let labels: Vec<&str> = children.iter().map(|(_, m)| m.label.as_str()).collect();
assert!(labels.contains(&"A"));
assert!(labels.contains(&"B"));
}
#[tokio::test]
async fn test_inspect_node() {
use async_trait::async_trait;
struct MockInspectable;
#[async_trait]
impl Inspectable for MockInspectable {
async fn inspect(&self) -> serde_json::Value {
serde_json::json!({ "internal_queue_len": 42 })
}
fn current_state(&self) -> NodeState {
NodeState::Running
}
}
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let id = NodeId::new("dirigent/inspectable");
let meta = NodeMetadata::new(NodeKind::Service, "Inspectable Service");
let _handle = registry
.register(id.clone(), &root, meta, Some(Arc::new(MockInspectable)))
.await
.unwrap();
let result = registry.inspect_node(&id).await;
assert_eq!(
result.unwrap(),
serde_json::json!({ "internal_queue_len": 42 })
);
// Non-inspectable node returns None
let id2 = NodeId::new("dirigent/plain");
let _h2 = registry
.register(
id2.clone(),
&root,
NodeMetadata::new(NodeKind::Service, "Plain"),
None,
)
.await
.unwrap();
assert!(registry.inspect_node(&id2).await.is_none());
}
}
-154
View File
@@ -1,154 +0,0 @@
use crate::node::{NodeId, NodeMetadata};
use crate::tree::NodeTree;
use serde::{Deserialize, Serialize};
/// A serializable point-in-time capture of the entire inspector tree.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TreeSnapshot {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub nodes: Vec<NodeSnapshot>,
}
/// Snapshot of a single node.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NodeSnapshot {
pub id: NodeId,
pub parent: Option<NodeId>,
pub children: Vec<NodeId>,
pub metadata: NodeMetadata,
}
impl TreeSnapshot {
/// Create a snapshot from a `NodeTree`.
pub(crate) fn from_tree(tree: &NodeTree) -> Self {
let nodes = tree
.all_nodes()
.into_iter()
.map(|n| NodeSnapshot {
id: n.id.clone(),
parent: n.parent.clone(),
children: n.children.clone(),
metadata: n.metadata.clone(),
})
.collect();
Self {
timestamp: chrono::Utc::now(),
nodes,
}
}
/// Serialize the snapshot to a JSON `Value`.
pub fn to_json(&self) -> serde_json::Value {
serde_json::to_value(self).unwrap_or_default()
}
/// Serialize the snapshot to pretty-printed JSON string.
pub fn to_json_pretty(&self) -> String {
serde_json::to_string_pretty(self).unwrap_or_default()
}
/// Number of nodes in the snapshot.
pub fn node_count(&self) -> usize {
self.nodes.len()
}
/// Find a node by ID.
pub fn find(&self, id: &NodeId) -> Option<&NodeSnapshot> {
self.nodes.iter().find(|n| &n.id == id)
}
/// Get the root node (the one with no parent).
pub fn root(&self) -> Option<&NodeSnapshot> {
self.nodes.iter().find(|n| n.parent.is_none())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::node::{NodeKind, NodeMetadata, NodeState};
use crate::tree::NodeTree;
fn make_tree() -> NodeTree {
let root = NodeId::new("dirigent");
let mut tree = NodeTree::new(
root.clone(),
NodeMetadata::new(NodeKind::Root, "Dirigent").with_state(NodeState::Running),
);
tree.insert(
NodeId::new("dirigent/svc"),
&root,
NodeMetadata::new(NodeKind::Service, "Service A").with_state(NodeState::Running),
None,
)
.unwrap();
tree.insert(
NodeId::new("dirigent/svc/task"),
&NodeId::new("dirigent/svc"),
NodeMetadata::new(NodeKind::AsyncTask, "Task 1"),
None,
)
.unwrap();
tree
}
#[test]
fn test_snapshot_from_tree() {
let tree = make_tree();
let snap = TreeSnapshot::from_tree(&tree);
assert_eq!(snap.node_count(), 3);
assert!(snap.find(&NodeId::new("dirigent")).is_some());
assert!(snap.find(&NodeId::new("dirigent/svc")).is_some());
assert!(snap.find(&NodeId::new("dirigent/svc/task")).is_some());
}
#[test]
fn test_snapshot_root() {
let tree = make_tree();
let snap = TreeSnapshot::from_tree(&tree);
let root = snap.root().unwrap();
assert_eq!(root.id.as_str(), "dirigent");
assert!(root.parent.is_none());
}
#[test]
fn test_snapshot_serialization_roundtrip() {
let tree = make_tree();
let snap = TreeSnapshot::from_tree(&tree);
let json = serde_json::to_string(&snap).unwrap();
let deserialized: TreeSnapshot = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.node_count(), snap.node_count());
assert_eq!(
deserialized
.find(&NodeId::new("dirigent/svc"))
.unwrap()
.metadata
.label,
"Service A"
);
}
#[test]
fn test_snapshot_to_json_pretty() {
let tree = make_tree();
let snap = TreeSnapshot::from_tree(&tree);
let pretty = snap.to_json_pretty();
assert!(pretty.contains("dirigent"));
assert!(pretty.contains("Service A"));
}
#[test]
fn test_snapshot_to_json_value() {
let tree = make_tree();
let snap = TreeSnapshot::from_tree(&tree);
let val = snap.to_json();
assert!(val.get("timestamp").is_some());
assert!(val.get("nodes").unwrap().is_array());
}
}
-231
View File
@@ -1,231 +0,0 @@
use crate::node::NodeId;
use crate::registry::InspectorRegistry;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::System;
use tokio::task::JoinHandle;
use tracing::warn;
/// Host system information.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SystemInfo {
pub hostname: Option<String>,
pub os_name: Option<String>,
pub os_version: Option<String>,
pub kernel_version: Option<String>,
pub arch: String,
pub total_memory_bytes: u64,
pub used_memory_bytes: u64,
pub available_memory_bytes: u64,
pub total_swap_bytes: u64,
pub used_swap_bytes: u64,
pub cpu_count: usize,
pub physical_core_count: Option<usize>,
pub global_cpu_usage_percent: f32,
pub uptime_secs: u64,
}
/// Monitors host system metrics (memory, CPU, etc.).
pub struct SystemMonitor {
system: System,
}
impl SystemMonitor {
/// Create a new system monitor.
pub fn new() -> Self {
let mut system = System::new();
// Initial refresh to populate baseline data
system.refresh_memory();
system.refresh_cpu_usage();
Self { system }
}
/// Refresh and return current system information.
pub fn refresh(&mut self) -> SystemInfo {
self.system.refresh_memory();
self.system.refresh_cpu_usage();
SystemInfo {
hostname: System::host_name(),
os_name: System::name(),
os_version: System::os_version(),
kernel_version: System::kernel_version(),
arch: System::cpu_arch(),
total_memory_bytes: self.system.total_memory(),
used_memory_bytes: self.system.used_memory(),
available_memory_bytes: self.system.available_memory(),
total_swap_bytes: self.system.total_swap(),
used_swap_bytes: self.system.used_swap(),
cpu_count: self.system.cpus().len(),
physical_core_count: self.system.physical_core_count(),
global_cpu_usage_percent: self.system.global_cpu_usage(),
uptime_secs: System::uptime(),
}
}
/// Spawn a background task that periodically updates a system node in the registry.
///
/// The `node_id` should already be registered in the tree (e.g., "dirigent/system/host").
pub fn start_polling(
mut self,
registry: Arc<InspectorRegistry>,
node_id: NodeId,
interval: Duration,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let info = self.refresh();
let mut props = HashMap::new();
props.insert("hostname".to_string(), serde_json::json!(info.hostname));
props.insert("os_name".to_string(), serde_json::json!(info.os_name));
props.insert("os_version".to_string(), serde_json::json!(info.os_version));
props.insert("arch".to_string(), serde_json::json!(info.arch));
props.insert(
"total_memory_bytes".to_string(),
serde_json::json!(info.total_memory_bytes),
);
props.insert(
"used_memory_bytes".to_string(),
serde_json::json!(info.used_memory_bytes),
);
props.insert(
"available_memory_bytes".to_string(),
serde_json::json!(info.available_memory_bytes),
);
props.insert(
"total_swap_bytes".to_string(),
serde_json::json!(info.total_swap_bytes),
);
props.insert(
"used_swap_bytes".to_string(),
serde_json::json!(info.used_swap_bytes),
);
props.insert("cpu_count".to_string(), serde_json::json!(info.cpu_count));
props.insert(
"physical_core_count".to_string(),
serde_json::json!(info.physical_core_count),
);
props.insert(
"global_cpu_usage_percent".to_string(),
serde_json::json!(info.global_cpu_usage_percent),
);
props.insert(
"uptime_secs".to_string(),
serde_json::json!(info.uptime_secs),
);
if let Err(e) = registry.update_properties(&node_id, props).await {
warn!(
node_id = %node_id,
error = %e,
"Failed to update system node properties"
);
}
}
})
}
}
impl Default for SystemMonitor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_system_monitor_refresh() {
let mut monitor = SystemMonitor::new();
// Need a brief sleep for CPU usage sampling
std::thread::sleep(std::time::Duration::from_millis(200));
let info = monitor.refresh();
assert!(info.total_memory_bytes > 0, "Should have total memory");
assert!(info.cpu_count > 0, "Should have at least 1 CPU");
assert!(info.uptime_secs > 0, "Should have uptime");
assert!(info.arch.len() > 0, "Should have arch info");
}
#[test]
fn test_system_info_serialization() {
let info = SystemInfo {
hostname: Some("testhost".to_string()),
os_name: Some("macOS".to_string()),
os_version: Some("14.0".to_string()),
kernel_version: Some("23.0.0".to_string()),
arch: "arm64".to_string(),
total_memory_bytes: 16 * 1024 * 1024 * 1024,
used_memory_bytes: 8 * 1024 * 1024 * 1024,
available_memory_bytes: 8 * 1024 * 1024 * 1024,
total_swap_bytes: 2 * 1024 * 1024 * 1024,
used_swap_bytes: 512 * 1024 * 1024,
cpu_count: 10,
physical_core_count: Some(10),
global_cpu_usage_percent: 25.5,
uptime_secs: 86400,
};
let json = serde_json::to_string(&info).unwrap();
let deserialized: SystemInfo = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.hostname, Some("testhost".to_string()));
assert_eq!(deserialized.total_memory_bytes, 16 * 1024 * 1024 * 1024);
assert_eq!(deserialized.cpu_count, 10);
}
#[tokio::test]
async fn test_system_monitor_polling() {
use crate::node::{NodeKind, NodeMetadata, NodeState};
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
// Register system node
let node_id = NodeId::new("dirigent/system/host");
let mut handle = registry
.register(
node_id.clone(),
&root,
NodeMetadata::new(NodeKind::System, "Host System").with_state(NodeState::Running),
None,
)
.await
.unwrap();
// Detach so polling task can update it
// (handle would deregister on drop otherwise)
handle.detach();
let monitor = SystemMonitor::new();
let task = monitor.start_polling(
Arc::clone(&registry),
node_id.clone(),
Duration::from_millis(100),
);
// Wait for at least one poll cycle
tokio::time::sleep(Duration::from_millis(250)).await;
let meta = registry.get_node(&node_id).await.unwrap();
assert!(
meta.properties.contains_key("total_memory_bytes"),
"Should have system metrics after polling"
);
assert!(
meta.properties.contains_key("cpu_count"),
"Should have CPU count"
);
task.abort();
}
}
-544
View File
@@ -1,544 +0,0 @@
use crate::error::{InspectorError, Result};
use crate::node::{Inspectable, NodeId, NodeMetadata, NodeState};
use std::collections::HashMap;
use std::sync::Arc;
/// A node within the inspector tree, holding metadata and relationships.
pub struct TreeNode {
pub id: NodeId,
pub metadata: NodeMetadata,
pub parent: Option<NodeId>,
pub children: Vec<NodeId>,
pub inspectable: Option<Arc<dyn Inspectable>>,
}
impl TreeNode {
fn new(
id: NodeId,
metadata: NodeMetadata,
parent: Option<NodeId>,
inspectable: Option<Arc<dyn Inspectable>>,
) -> Self {
Self {
id,
metadata,
parent,
children: Vec::new(),
inspectable,
}
}
}
/// The inspector tree: a rooted tree of `TreeNode`s with parent-child relationships.
///
/// All mutations go through this struct. Thread safety is provided by wrapping
/// `NodeTree` in `Arc<RwLock<NodeTree>>` at the registry level.
pub struct NodeTree {
nodes: HashMap<NodeId, TreeNode>,
root: NodeId,
}
impl NodeTree {
/// Create a new tree with a root node.
///
/// The root node is always present and cannot be removed.
pub fn new(root_id: NodeId, root_metadata: NodeMetadata) -> Self {
let mut nodes = HashMap::new();
nodes.insert(
root_id.clone(),
TreeNode::new(root_id.clone(), root_metadata, None, None),
);
Self {
nodes,
root: root_id,
}
}
/// Get the root node ID.
pub fn root_id(&self) -> &NodeId {
&self.root
}
/// Insert a new node as a child of the given parent.
pub fn insert(
&mut self,
id: NodeId,
parent: &NodeId,
metadata: NodeMetadata,
inspectable: Option<Arc<dyn Inspectable>>,
) -> Result<()> {
if self.nodes.contains_key(&id) {
return Err(InspectorError::NodeAlreadyExists(id));
}
if !self.nodes.contains_key(parent) {
return Err(InspectorError::ParentNotFound(parent.clone()));
}
let node = TreeNode::new(id.clone(), metadata, Some(parent.clone()), inspectable);
self.nodes.insert(id.clone(), node);
// Add as child of parent
if let Some(parent_node) = self.nodes.get_mut(parent) {
parent_node.children.push(id);
}
Ok(())
}
/// Remove a node and reparent its children to the node's parent.
///
/// The root node cannot be removed.
pub fn remove(&mut self, id: &NodeId) -> Result<()> {
if id == &self.root {
return Err(InspectorError::CannotRemoveRoot);
}
let node = self
.nodes
.get(id)
.ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?;
let parent_id = node.parent.clone();
let children: Vec<NodeId> = node.children.clone();
// Reparent children to the removed node's parent
if let Some(ref parent_id) = parent_id {
for child_id in &children {
if let Some(child) = self.nodes.get_mut(child_id) {
child.parent = Some(parent_id.clone());
}
}
// Update parent's children: remove this node, add reparented children
if let Some(parent) = self.nodes.get_mut(parent_id) {
parent.children.retain(|c| c != id);
parent.children.extend(children);
}
}
self.nodes.remove(id);
Ok(())
}
/// Remove a node and all its descendants.
///
/// The root node cannot be removed.
pub fn remove_subtree(&mut self, id: &NodeId) -> Result<()> {
if id == &self.root {
return Err(InspectorError::CannotRemoveRoot);
}
if !self.nodes.contains_key(id) {
return Err(InspectorError::NodeNotFound(id.clone()));
}
// Collect all descendant IDs via BFS
let mut to_remove = Vec::new();
let mut queue = vec![id.clone()];
while let Some(current) = queue.pop() {
if let Some(node) = self.nodes.get(&current) {
queue.extend(node.children.clone());
}
to_remove.push(current);
}
// Remove this node from parent's children list
let parent_id = self.nodes.get(id).and_then(|n| n.parent.clone());
if let Some(parent_id) = parent_id {
if let Some(parent) = self.nodes.get_mut(&parent_id) {
parent.children.retain(|c| c != id);
}
}
// Remove all collected nodes
for node_id in &to_remove {
self.nodes.remove(node_id);
}
Ok(())
}
/// Get a reference to a node by ID.
pub fn get(&self, id: &NodeId) -> Option<&TreeNode> {
self.nodes.get(id)
}
/// Get a mutable reference to a node by ID.
pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut TreeNode> {
self.nodes.get_mut(id)
}
/// Get the direct children of a node.
pub fn children(&self, id: &NodeId) -> Vec<&TreeNode> {
self.nodes
.get(id)
.map(|node| {
node.children
.iter()
.filter_map(|child_id| self.nodes.get(child_id))
.collect()
})
.unwrap_or_default()
}
/// Get all ancestors of a node, from immediate parent to root.
pub fn ancestors(&self, id: &NodeId) -> Vec<&TreeNode> {
let mut result = Vec::new();
let mut current = self.nodes.get(id).and_then(|n| n.parent.as_ref());
while let Some(parent_id) = current {
if let Some(parent) = self.nodes.get(parent_id) {
result.push(parent);
current = parent.parent.as_ref();
} else {
break;
}
}
result
}
/// Get all nodes in the tree.
pub fn all_nodes(&self) -> Vec<&TreeNode> {
self.nodes.values().collect()
}
/// Total number of nodes in the tree.
pub fn len(&self) -> usize {
self.nodes.len()
}
/// Whether the tree is empty (only root).
pub fn is_empty(&self) -> bool {
self.nodes.len() <= 1
}
/// Update a node's state, returning the old state.
pub fn update_state(&mut self, id: &NodeId, state: NodeState) -> Result<NodeState> {
let node = self
.nodes
.get_mut(id)
.ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?;
let old = node.metadata.state.clone();
node.metadata.state = state;
node.metadata.last_updated = chrono::Utc::now();
Ok(old)
}
/// Update or insert properties on a node. Returns only the keys whose values actually changed.
///
/// A key is considered changed if it is new (not previously present) or if its value
/// differs from the stored value (compared via `serde_json::Value::PartialEq`).
/// The `last_updated` timestamp is only bumped when at least one value changed.
pub fn update_properties(
&mut self,
id: &NodeId,
props: HashMap<String, serde_json::Value>,
) -> Result<Vec<String>> {
let node = self
.nodes
.get_mut(id)
.ok_or_else(|| InspectorError::NodeNotFound(id.clone()))?;
let mut changed_keys = Vec::new();
for (key, new_value) in props {
let is_changed = match node.metadata.properties.get(&key) {
Some(existing) => existing != &new_value,
None => true, // new key
};
if is_changed {
changed_keys.push(key.clone());
node.metadata.properties.insert(key, new_value);
}
}
if !changed_keys.is_empty() {
node.metadata.last_updated = chrono::Utc::now();
}
Ok(changed_keys)
}
/// Check if a node exists.
pub fn contains(&self, id: &NodeId) -> bool {
self.nodes.contains_key(id)
}
/// Get all descendant IDs of a node (not including the node itself).
pub fn descendants(&self, id: &NodeId) -> Vec<NodeId> {
let mut result = Vec::new();
let mut queue: Vec<NodeId> = self
.nodes
.get(id)
.map(|n| n.children.clone())
.unwrap_or_default();
while let Some(current) = queue.pop() {
if let Some(node) = self.nodes.get(&current) {
queue.extend(node.children.clone());
}
result.push(current);
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::node::{NodeKind, NodeMetadata};
fn root_meta() -> NodeMetadata {
NodeMetadata::new(NodeKind::Root, "Dirigent")
}
fn connector_meta(label: &str) -> NodeMetadata {
NodeMetadata::new(NodeKind::Connector, label).with_state(NodeState::Running)
}
fn make_tree() -> NodeTree {
let root = NodeId::new("dirigent");
let mut tree = NodeTree::new(root.clone(), root_meta());
// Add category nodes
tree.insert(
root.child("connectors"),
&root,
NodeMetadata::new(NodeKind::Root, "Connectors"),
None,
)
.unwrap();
tree.insert(
root.child("services"),
&root,
NodeMetadata::new(NodeKind::Root, "Services"),
None,
)
.unwrap();
// Add connectors
let connectors = root.child("connectors");
tree.insert(
connectors.child("opencode-1"),
&connectors,
connector_meta("OpenCode #1"),
None,
)
.unwrap();
tree.insert(
connectors.child("acp-claude"),
&connectors,
connector_meta("ACP Claude"),
None,
)
.unwrap();
// Add a process child under acp-claude
let acp = connectors.child("acp-claude");
tree.insert(
acp.child("stdio-process"),
&acp,
NodeMetadata::new(NodeKind::Process, "stdio-transport")
.with_state(NodeState::Running)
.with_property("pid", serde_json::json!(42)),
None,
)
.unwrap();
tree
}
#[test]
fn test_new_tree_has_root() {
let root = NodeId::new("dirigent");
let tree = NodeTree::new(root.clone(), root_meta());
assert_eq!(tree.len(), 1);
assert!(tree.get(&root).is_some());
assert_eq!(tree.root_id(), &root);
}
#[test]
fn test_insert_and_lookup() {
let tree = make_tree();
assert_eq!(tree.len(), 6); // root + connectors + services + opencode + acp + stdio
assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude")));
assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
}
#[test]
fn test_insert_duplicate_fails() {
let mut tree = make_tree();
let result = tree.insert(
NodeId::new("dirigent/connectors/opencode-1"),
&NodeId::new("dirigent/connectors"),
connector_meta("Duplicate"),
None,
);
assert!(matches!(result, Err(InspectorError::NodeAlreadyExists(_))));
}
#[test]
fn test_insert_missing_parent_fails() {
let mut tree = make_tree();
let result = tree.insert(
NodeId::new("dirigent/nonexistent/child"),
&NodeId::new("dirigent/nonexistent"),
connector_meta("Orphan"),
None,
);
assert!(matches!(result, Err(InspectorError::ParentNotFound(_))));
}
#[test]
fn test_children() {
let tree = make_tree();
let connectors = NodeId::new("dirigent/connectors");
let children = tree.children(&connectors);
assert_eq!(children.len(), 2);
let child_ids: Vec<&str> = children.iter().map(|c| c.id.as_str()).collect();
assert!(child_ids.contains(&"dirigent/connectors/opencode-1"));
assert!(child_ids.contains(&"dirigent/connectors/acp-claude"));
}
#[test]
fn test_ancestors() {
let tree = make_tree();
let stdio = NodeId::new("dirigent/connectors/acp-claude/stdio-process");
let ancestors = tree.ancestors(&stdio);
assert_eq!(ancestors.len(), 3);
assert_eq!(ancestors[0].id.as_str(), "dirigent/connectors/acp-claude");
assert_eq!(ancestors[1].id.as_str(), "dirigent/connectors");
assert_eq!(ancestors[2].id.as_str(), "dirigent");
}
#[test]
fn test_remove_reparents_children() {
let mut tree = make_tree();
// Remove acp-claude; its child (stdio-process) should be reparented to "connectors"
tree.remove(&NodeId::new("dirigent/connectors/acp-claude"))
.unwrap();
assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude")));
assert!(tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
// stdio-process should now be a child of connectors
let connectors = tree.get(&NodeId::new("dirigent/connectors")).unwrap();
assert!(connectors
.children
.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
// stdio-process parent should be connectors
let stdio = tree
.get(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
.unwrap();
assert_eq!(
stdio.parent.as_ref().unwrap().as_str(),
"dirigent/connectors"
);
}
#[test]
fn test_remove_subtree() {
let mut tree = make_tree();
tree.remove_subtree(&NodeId::new("dirigent/connectors/acp-claude"))
.unwrap();
assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude")));
assert!(!tree.contains(&NodeId::new("dirigent/connectors/acp-claude/stdio-process")));
assert_eq!(tree.len(), 4); // root + connectors + services + opencode
// connectors should only have opencode-1
let connectors = tree.get(&NodeId::new("dirigent/connectors")).unwrap();
assert_eq!(connectors.children.len(), 1);
}
#[test]
fn test_cannot_remove_root() {
let mut tree = make_tree();
let result = tree.remove(&NodeId::new("dirigent"));
assert!(matches!(result, Err(InspectorError::CannotRemoveRoot)));
let result = tree.remove_subtree(&NodeId::new("dirigent"));
assert!(matches!(result, Err(InspectorError::CannotRemoveRoot)));
}
#[test]
fn test_update_state() {
let mut tree = make_tree();
let id = NodeId::new("dirigent/connectors/opencode-1");
let old = tree
.update_state(&id, NodeState::Error("timeout".into()))
.unwrap();
assert_eq!(old, NodeState::Running);
let node = tree.get(&id).unwrap();
assert_eq!(node.metadata.state, NodeState::Error("timeout".into()));
}
#[test]
fn test_update_properties() {
let mut tree = make_tree();
let id = NodeId::new("dirigent/connectors/acp-claude/stdio-process");
let mut props = HashMap::new();
props.insert("cpu_percent".to_string(), serde_json::json!(45.2));
props.insert("memory_mb".to_string(), serde_json::json!(128));
let keys = tree.update_properties(&id, props).unwrap();
assert_eq!(keys.len(), 2);
let node = tree.get(&id).unwrap();
assert_eq!(node.metadata.properties["pid"], serde_json::json!(42)); // original preserved
assert_eq!(
node.metadata.properties["cpu_percent"],
serde_json::json!(45.2)
);
}
#[test]
fn test_update_properties_no_change() {
let mut tree = make_tree();
let id = NodeId::new("dirigent/connectors/acp-claude/stdio-process");
// First update: new key, should be reported as changed
let mut props = HashMap::new();
props.insert("cpu_percent".to_string(), serde_json::json!(45.2));
let keys = tree.update_properties(&id, props).unwrap();
assert_eq!(keys.len(), 1);
// Second update: same value, should NOT be reported as changed
let mut props = HashMap::new();
props.insert("cpu_percent".to_string(), serde_json::json!(45.2));
let keys = tree.update_properties(&id, props).unwrap();
assert_eq!(keys.len(), 0, "Same value should not be reported as changed");
// Third update: different value, should be reported
let mut props = HashMap::new();
props.insert("cpu_percent".to_string(), serde_json::json!(50.0));
let keys = tree.update_properties(&id, props).unwrap();
assert_eq!(keys.len(), 1);
}
#[test]
fn test_descendants() {
let tree = make_tree();
let root = NodeId::new("dirigent");
let descendants = tree.descendants(&root);
assert_eq!(descendants.len(), 5); // all except root itself
}
#[test]
fn test_is_empty() {
let root = NodeId::new("dirigent");
let tree = NodeTree::new(root, root_meta());
assert!(tree.is_empty());
let tree = make_tree();
assert!(!tree.is_empty());
}
}
@@ -1,355 +0,0 @@
use dirigent_inspector::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
/// Full integration test simulating a Dirigent-like setup:
/// - Root node "dirigent"
/// - Connector nodes under "dirigent/connectors"
/// - Process node under a connector
/// - Service nodes under "dirigent/services"
/// - System node under "dirigent/system"
/// - Bidirectional channel communication
/// - Snapshot capture
#[tokio::test]
async fn test_full_inspector_tree() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
assert_eq!(root.as_str(), "dirigent");
// Subscribe to events
let mut event_rx = registry.subscribe();
// -- Build the tree structure --
// Category: connectors
let connectors_handle = registry
.register(
NodeId::new("dirigent/connectors"),
&root,
NodeMetadata::new(NodeKind::Custom("category".into()), "Connectors")
.with_state(NodeState::Running),
None,
)
.await
.unwrap();
// Category: services
let mut services_handle = registry
.register(
NodeId::new("dirigent/services"),
&root,
NodeMetadata::new(NodeKind::Custom("category".into()), "Services")
.with_state(NodeState::Running),
None,
)
.await
.unwrap();
// Category: system
let mut system_handle = registry
.register(
NodeId::new("dirigent/system"),
&root,
NodeMetadata::new(NodeKind::Custom("category".into()), "System")
.with_state(NodeState::Running),
None,
)
.await
.unwrap();
// Connector: ACP Claude
let acp_handle = connectors_handle
.register_child(
NodeId::new("dirigent/connectors/acp-claude"),
NodeMetadata::new(NodeKind::Connector, "ACP Claude")
.with_state(NodeState::Running)
.with_property("transport", serde_json::json!("stdio")),
None,
)
.await
.unwrap();
// Process: stdio transport child
let current_pid = std::process::id();
let proc_handle = acp_handle
.register_child(
NodeId::new("dirigent/connectors/acp-claude/stdio-process"),
NodeMetadata::new(NodeKind::Process, "stdio-transport")
.with_state(NodeState::Running)
.with_property("pid", serde_json::json!(current_pid)),
None,
)
.await
.unwrap();
// Connector: OpenCode
let _opencode_handle = connectors_handle
.register_child(
NodeId::new("dirigent/connectors/opencode-1"),
NodeMetadata::new(NodeKind::Connector, "OpenCode #1")
.with_state(NodeState::Running)
.with_property("base_url", serde_json::json!("http://localhost:12225")),
None,
)
.await
.unwrap();
// Service: Archivist
let _archivist_handle = services_handle
.register_child(
NodeId::new("dirigent/services/archivist"),
NodeMetadata::new(NodeKind::Service, "Archivist EventHandler")
.with_state(NodeState::Idle),
None,
)
.await
.unwrap();
// System: Host
let _host_handle = system_handle
.register_child(
NodeId::new("dirigent/system/host"),
NodeMetadata::new(NodeKind::System, "Host Machine").with_state(NodeState::Running),
None,
)
.await
.unwrap();
// -- Verify tree structure --
// dirigent, connectors, services, system, acp-claude, stdio-process, opencode-1, archivist, host = 9
assert_eq!(registry.node_count().await, 9);
// Check children
let root_children = registry.get_children(&root).await;
assert_eq!(root_children.len(), 3); // connectors, services, system
let connector_children = registry
.get_children(&NodeId::new("dirigent/connectors"))
.await;
assert_eq!(connector_children.len(), 2); // acp-claude, opencode-1
let acp_children = registry
.get_children(&NodeId::new("dirigent/connectors/acp-claude"))
.await;
assert_eq!(acp_children.len(), 1); // stdio-process
// -- State transitions --
proc_handle
.set_state(NodeState::Busy("processing message".into()))
.await
.unwrap();
let proc_meta = registry
.get_node(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
.await
.unwrap();
assert_eq!(
proc_meta.state,
NodeState::Busy("processing message".into())
);
// -- Property updates --
let mut props = HashMap::new();
props.insert("cpu_percent".to_string(), serde_json::json!(23.5));
props.insert("memory_mb".to_string(), serde_json::json!(256));
proc_handle.set_properties(props).await.unwrap();
let proc_meta = registry
.get_node(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
.await
.unwrap();
assert_eq!(proc_meta.properties["cpu_percent"], serde_json::json!(23.5));
assert_eq!(proc_meta.properties["pid"], serde_json::json!(current_pid)); // original preserved
// -- Snapshot --
let snapshot = registry.snapshot().await;
assert_eq!(snapshot.node_count(), 9);
// Verify snapshot structure
let snap_root = snapshot.root().unwrap();
assert_eq!(snap_root.id.as_str(), "dirigent");
assert_eq!(snap_root.children.len(), 3);
let snap_proc = snapshot
.find(&NodeId::new("dirigent/connectors/acp-claude/stdio-process"))
.unwrap();
assert_eq!(
snap_proc.parent,
Some(NodeId::new("dirigent/connectors/acp-claude"))
);
assert_eq!(
snap_proc.metadata.state,
NodeState::Busy("processing message".into())
);
// Snapshot serialization roundtrip
let json = serde_json::to_string(&snapshot).unwrap();
let deserialized: TreeSnapshot = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.node_count(), 9);
// -- Events --
// Drain all events that were emitted during setup
let mut event_count = 0;
while let Ok(event) = event_rx.try_recv() {
event_count += 1;
// Just verify they're valid events
match event {
InspectorEvent::NodeRegistered { .. }
| InspectorEvent::StateChanged { .. }
| InspectorEvent::PropertiesUpdated { .. }
| InspectorEvent::NodeRemoved { .. } => {}
}
}
assert!(event_count > 0, "Should have received events");
// -- Cleanup: detach category handles so they survive --
services_handle.detach();
system_handle.detach();
}
/// Test process monitor with the current process.
#[tokio::test]
async fn test_process_monitor_integration() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
let current_pid = std::process::id();
let node_id = NodeId::new("dirigent/test-process");
let mut handle = registry
.register(
node_id.clone(),
&root,
NodeMetadata::new(NodeKind::Process, "Test Process").with_state(NodeState::Running),
None,
)
.await
.unwrap();
handle.detach();
// Create monitor and track current process
let mut monitor = ProcessMonitor::new();
monitor.track(current_pid, node_id.clone());
// Start polling
let task = monitor.start_polling(Arc::clone(&registry), Duration::from_millis(100));
// Wait for data to be populated
tokio::time::sleep(Duration::from_millis(350)).await;
let meta = registry.get_node(&node_id).await.unwrap();
assert!(
meta.properties.contains_key("pid"),
"Should have PID property"
);
assert!(
meta.properties.contains_key("memory_bytes"),
"Should have memory property"
);
assert_eq!(meta.properties["pid"], serde_json::json!(current_pid));
task.abort();
}
/// Test bidirectional channel communication with a simulated node loop.
#[tokio::test]
async fn test_channel_integration() {
let (sender, mut receiver) = inspector_channel(10);
// Simulate a node loop that handles commands
let node_loop = tokio::spawn(async move {
let mut handled = 0;
while let Some((cmd, resp_tx)) = receiver.recv().await {
let response = match cmd.kind {
CommandKind::Introspect => CommandResponse::ok(
&cmd.id,
serde_json::json!({
"queue_depth": receiver.pending_count(),
"sessions_active": 3
}),
),
CommandKind::Execute(ref name) if name == "restart" => {
CommandResponse::ok(&cmd.id, serde_json::json!("restarting..."))
}
_ => CommandResponse::err(&cmd.id, "unknown command"),
};
let _ = resp_tx.send(response);
handled += 1;
if handled >= 2 {
break;
}
}
});
// Send introspect command
let resp = sender
.send(NodeCommand {
id: "cmd-1".to_string(),
kind: CommandKind::Introspect,
payload: serde_json::Value::Null,
})
.await
.unwrap();
assert!(resp.success);
assert_eq!(resp.data["sessions_active"], 3);
// Send execute command
let resp = sender
.send(NodeCommand {
id: "cmd-2".to_string(),
kind: CommandKind::Execute("restart".to_string()),
payload: serde_json::Value::Null,
})
.await
.unwrap();
assert!(resp.success);
node_loop.await.unwrap();
}
/// Test that dropping a handle auto-deregisters, and subtree removal works.
#[tokio::test]
async fn test_lifecycle_management() {
let registry = Arc::new(InspectorRegistry::new());
let root = registry.root_id().await;
// Build a subtree
let parent = registry
.register(
NodeId::new("dirigent/parent"),
&root,
NodeMetadata::new(NodeKind::Connector, "Parent"),
None,
)
.await
.unwrap();
let _child1 = parent
.register_child(
NodeId::new("dirigent/parent/child1"),
NodeMetadata::new(NodeKind::Process, "Child 1"),
None,
)
.await
.unwrap();
let _child2 = parent
.register_child(
NodeId::new("dirigent/parent/child2"),
NodeMetadata::new(NodeKind::AsyncTask, "Child 2"),
None,
)
.await
.unwrap();
assert_eq!(registry.node_count().await, 4); // root + parent + 2 children
// Remove entire subtree
registry
.deregister_subtree(&NodeId::new("dirigent/parent"))
.await
.unwrap();
assert_eq!(registry.node_count().await, 1); // only root remains
}
-32
View File
@@ -1,32 +0,0 @@
[package]
name = "dirigent_process"
version = "0.1.0"
edition = "2021"
description = "Cross-platform process lifecycle management for Dirigent"
[lib]
path = "src/lib.rs"
[features]
default = []
tokio = ["dep:tokio"]
[dependencies]
tracing = "0.1"
tokio = { version = "1", features = ["process", "time"], optional = true }
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.59", features = [
"Win32_System_JobObjects",
"Win32_System_Threading",
"Win32_Foundation",
"Win32_System_Console",
"Win32_Security",
] }
[target.'cfg(unix)'.dependencies]
nix = { version = "0.29", features = ["signal", "process"] }
libc = "0.2"
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
-30
View File
@@ -1,30 +0,0 @@
pub mod traits;
mod shutdown;
#[cfg(windows)]
mod windows;
#[cfg(unix)]
mod unix;
#[cfg(target_os = "linux")]
mod linux;
pub use traits::{ProcessGroupManager, ProcessLifecycle};
pub use shutdown::graceful_shutdown_sync;
#[cfg(feature = "tokio")]
pub use shutdown::graceful_shutdown_async;
use std::sync::Arc;
/// Create the platform-appropriate ProcessGroupManager.
///
/// Call `init()` on the returned manager before use.
pub fn create_manager() -> Arc<dyn ProcessGroupManager> {
#[cfg(windows)]
{ Arc::new(windows::WindowsProcessGroupManager::new()) }
#[cfg(target_os = "linux")]
{ Arc::new(linux::LinuxProcessGroupManager::new()) }
#[cfg(all(unix, not(target_os = "linux")))]
{ Arc::new(unix::UnixProcessGroupManager::new()) }
}
-91
View File
@@ -1,91 +0,0 @@
#![cfg(target_os = "linux")]
use crate::traits::{ProcessGroupManager, ProcessLifecycle};
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
use std::io;
use std::os::unix::process::CommandExt;
use tracing::{debug, info, warn};
/// Linux process group manager with kernel-level orphan prevention.
///
/// Uses `PR_SET_CHILD_SUBREAPER` so orphaned grandchildren are reparented
/// to this process, and `PR_SET_PDEATHSIG` so children auto-die when
/// the parent crashes.
pub struct LinuxProcessGroupManager;
impl LinuxProcessGroupManager {
pub fn new() -> Self { Self }
}
impl ProcessGroupManager for LinuxProcessGroupManager {
fn init(&self) -> Result<(), io::Error> {
unsafe {
if libc::prctl(libc::PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0) != 0 {
let err = io::Error::last_os_error();
warn!(error = %err, "Failed to set PR_SET_CHILD_SUBREAPER");
return Err(err);
}
}
info!("Linux process group manager initialized (child subreaper enabled)");
Ok(())
}
fn create_lifecycle(&self) -> Box<dyn ProcessLifecycle> {
Box::new(LinuxProcessLifecycle)
}
}
pub struct LinuxProcessLifecycle;
impl ProcessLifecycle for LinuxProcessLifecycle {
fn configure_command(&self, cmd: &mut std::process::Command) {
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) != 0 {
return Err(io::Error::last_os_error());
}
if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL, 0, 0, 0) != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
}
#[cfg(feature = "tokio")]
fn configure_async_command(&self, cmd: &mut tokio::process::Command) {
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) != 0 {
return Err(io::Error::last_os_error());
}
if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL, 0, 0, 0) != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
}
fn register_child(&self, pid: u32) -> Result<(), io::Error> {
debug!(pid, pgid = pid, "Linux child registered (process group + PR_SET_PDEATHSIG)");
Ok(())
}
fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> {
let pgid = Pid::from_raw(pid as i32);
killpg(pgid, Signal::SIGTERM)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
debug!(pid, "Sent SIGTERM to process group");
Ok(())
}
fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> {
let pgid = Pid::from_raw(pid as i32);
killpg(pgid, Signal::SIGKILL)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
debug!(pid, "Sent SIGKILL to process group");
Ok(())
}
}
-66
View File
@@ -1,66 +0,0 @@
use crate::traits::ProcessLifecycle;
use std::time::Duration;
/// Graceful shutdown: send signal → wait → force kill (sync, blocking).
///
/// Returns `true` if the process exited within the timeout, `false` if force-killed.
pub fn graceful_shutdown_sync(
lifecycle: &dyn ProcessLifecycle,
child: &mut std::process::Child,
timeout: Duration,
) -> bool {
let pid = child.id();
if pid == 0 {
return true;
}
if lifecycle.send_shutdown_signal(pid).is_err() {
return true;
}
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(50);
while start.elapsed() < timeout {
match child.try_wait() {
Ok(Some(_)) => return true,
Ok(None) => std::thread::sleep(poll_interval),
Err(_) => return true,
}
}
tracing::debug!(pid, "Graceful shutdown timed out, force killing");
let _ = lifecycle.send_kill_signal(pid);
let _ = child.wait();
false
}
/// Graceful shutdown: send signal → wait → force kill (async, non-blocking).
///
/// Returns `true` if the process exited within the timeout, `false` if force-killed.
#[cfg(feature = "tokio")]
pub async fn graceful_shutdown_async(
lifecycle: &dyn ProcessLifecycle,
child: &mut tokio::process::Child,
timeout: Duration,
) -> bool {
let pid = match child.id() {
Some(0) | None => return true,
Some(pid) => pid,
};
if lifecycle.send_shutdown_signal(pid).is_err() {
return true;
}
match tokio::time::timeout(timeout, child.wait()).await {
Ok(Ok(_)) => true,
Ok(Err(_)) => true,
Err(_) => {
tracing::debug!(pid, "Graceful shutdown timed out, force killing");
let _ = lifecycle.send_kill_signal(pid);
let _ = child.wait().await;
false
}
}
}
-40
View File
@@ -1,40 +0,0 @@
use std::io;
/// Global process group manager — one per application lifetime.
///
/// On Windows, owns a Job Object with KILL_ON_JOB_CLOSE.
/// On Linux, configures the process as a child subreaper.
/// On macOS, no-op (process groups handle cleanup).
pub trait ProcessGroupManager: Send + Sync {
/// Initialize platform-specific parent process configuration.
fn init(&self) -> Result<(), io::Error>;
/// Create a lifecycle handle for managing a child process.
fn create_lifecycle(&self) -> Box<dyn ProcessLifecycle>;
}
/// Per-child process lifecycle manager.
///
/// All methods are synchronous — OS signal/handle calls are instant.
/// For timeout-based shutdown, use the free functions in the `shutdown` module.
pub trait ProcessLifecycle: Send + Sync {
/// Configure a std::process::Command before spawning.
/// Sets platform-specific flags (process group, creation flags, pre_exec hooks).
fn configure_command(&self, cmd: &mut std::process::Command);
/// Configure a tokio::process::Command before spawning.
#[cfg(feature = "tokio")]
fn configure_async_command(&self, cmd: &mut tokio::process::Command);
/// Register a spawned child with the lifecycle manager.
/// Must be called immediately after spawn with the child's PID.
fn register_child(&self, pid: u32) -> Result<(), io::Error>;
/// Send a graceful shutdown signal to the process (and its tree).
/// Windows: CTRL_BREAK_EVENT. Unix: SIGTERM to process group.
fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error>;
/// Forcefully kill the process (and its tree).
/// Windows: TerminateProcess. Unix: SIGKILL to process group.
fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error>;
}
-78
View File
@@ -1,78 +0,0 @@
#![cfg(unix)]
use crate::traits::{ProcessGroupManager, ProcessLifecycle};
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
use std::io;
use std::os::unix::process::CommandExt;
use tracing::{debug, info};
/// macOS / generic Unix process group manager.
///
/// Uses process groups for tree management. No kernel-level orphan
/// prevention (macOS lacks `PR_SET_PDEATHSIG`). Relies on launchd
/// supervision for crash recovery.
pub struct UnixProcessGroupManager;
impl UnixProcessGroupManager {
pub fn new() -> Self { Self }
}
impl ProcessGroupManager for UnixProcessGroupManager {
fn init(&self) -> Result<(), io::Error> {
info!("Unix process group manager initialized");
Ok(())
}
fn create_lifecycle(&self) -> Box<dyn ProcessLifecycle> {
Box::new(UnixProcessLifecycle)
}
}
pub struct UnixProcessLifecycle;
impl ProcessLifecycle for UnixProcessLifecycle {
fn configure_command(&self, cmd: &mut std::process::Command) {
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
}
#[cfg(feature = "tokio")]
fn configure_async_command(&self, cmd: &mut tokio::process::Command) {
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
}
fn register_child(&self, pid: u32) -> Result<(), io::Error> {
debug!(pid, pgid = pid, "Child registered in its own process group");
Ok(())
}
fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> {
let pgid = Pid::from_raw(pid as i32);
killpg(pgid, Signal::SIGTERM)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
debug!(pid, "Sent SIGTERM to process group");
Ok(())
}
fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> {
let pgid = Pid::from_raw(pid as i32);
killpg(pgid, Signal::SIGKILL)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
debug!(pid, "Sent SIGKILL to process group");
Ok(())
}
}
-199
View File
@@ -1,199 +0,0 @@
#![cfg(windows)]
use crate::traits::{ProcessGroupManager, ProcessLifecycle};
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use tracing::{debug, info, warn};
use windows_sys::Win32::Foundation::{CloseHandle, FALSE, HANDLE};
use windows_sys::Win32::System::JobObjects::{
AssignProcessToJobObject, CreateJobObjectW, JobObjectExtendedLimitInformation,
SetInformationJobObject, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
};
use windows_sys::Win32::System::Threading::{
OpenProcess, TerminateProcess, PROCESS_ALL_ACCESS,
};
use windows_sys::Win32::System::Console::{
GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT,
};
/// Windows process group manager using Job Objects.
///
/// Creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` — when
/// this manager is dropped (or the process crashes), the OS automatically
/// kills all assigned child processes including grandchildren.
pub struct WindowsProcessGroupManager {
/// Wrapped in a Mutex so we can mutate through a shared reference after init.
job_handle: Mutex<HANDLE>,
initialized: AtomicBool,
}
// Safety: HANDLE (*mut c_void) is not Send/Sync by default, but we only
// mutate it during init() (guarded by AtomicBool + Mutex) and read it
// (via copy) in create_lifecycle() and drop. No concurrent mutation occurs.
unsafe impl Send for WindowsProcessGroupManager {}
unsafe impl Sync for WindowsProcessGroupManager {}
impl WindowsProcessGroupManager {
pub fn new() -> Self {
Self {
job_handle: Mutex::new(std::ptr::null_mut()),
initialized: AtomicBool::new(false),
}
}
fn handle(&self) -> HANDLE {
*self.job_handle.lock().unwrap()
}
}
impl Default for WindowsProcessGroupManager {
fn default() -> Self {
Self::new()
}
}
impl ProcessGroupManager for WindowsProcessGroupManager {
fn init(&self) -> Result<(), io::Error> {
if self.initialized.swap(true, Ordering::SeqCst) {
return Ok(());
}
unsafe {
let handle = CreateJobObjectW(std::ptr::null(), std::ptr::null());
if handle.is_null() {
self.initialized.store(false, Ordering::SeqCst);
return Err(io::Error::last_os_error());
}
let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed();
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
let result = SetInformationJobObject(
handle,
JobObjectExtendedLimitInformation,
&info as *const _ as *const _,
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
);
if result == FALSE {
CloseHandle(handle);
self.initialized.store(false, Ordering::SeqCst);
return Err(io::Error::last_os_error());
}
*self.job_handle.lock().unwrap() = handle;
info!("Windows Job Object created with KILL_ON_JOB_CLOSE");
Ok(())
}
}
fn create_lifecycle(&self) -> Box<dyn ProcessLifecycle> {
Box::new(WindowsProcessLifecycle {
job_handle: self.handle(),
})
}
}
impl Drop for WindowsProcessGroupManager {
fn drop(&mut self) {
let handle = self.handle();
if !handle.is_null() {
unsafe { CloseHandle(handle); }
debug!("Windows Job Object closed");
}
}
}
/// Per-child lifecycle manager for Windows.
///
/// Assigns children to the parent's Job Object and uses
/// `CTRL_BREAK_EVENT` / `TerminateProcess` for shutdown.
pub struct WindowsProcessLifecycle {
job_handle: HANDLE,
}
// Safety: same reasoning as WindowsProcessGroupManager — HANDLE is used
// read-only after construction (only passed to OS APIs).
unsafe impl Send for WindowsProcessLifecycle {}
unsafe impl Sync for WindowsProcessLifecycle {}
impl ProcessLifecycle for WindowsProcessLifecycle {
fn configure_command(&self, cmd: &mut std::process::Command) {
use std::os::windows::process::CommandExt;
const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
const CREATE_NO_WINDOW: u32 = 0x0800_0000;
cmd.creation_flags(CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW);
}
#[cfg(feature = "tokio")]
fn configure_async_command(&self, cmd: &mut tokio::process::Command) {
use std::os::windows::process::CommandExt;
const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
const CREATE_NO_WINDOW: u32 = 0x0800_0000;
cmd.creation_flags(CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW);
}
fn register_child(&self, pid: u32) -> Result<(), io::Error> {
if self.job_handle.is_null() {
warn!(pid, "Job Object not initialized, skipping child registration");
return Ok(());
}
unsafe {
let process_handle = OpenProcess(PROCESS_ALL_ACCESS, FALSE, pid);
if process_handle.is_null() {
return Err(io::Error::last_os_error());
}
let result = AssignProcessToJobObject(self.job_handle, process_handle);
CloseHandle(process_handle);
if result == FALSE {
let err = io::Error::last_os_error();
warn!(pid, error = %err, "Failed to assign process to Job Object (may already be in a job)");
return Err(err);
}
debug!(pid, "Process assigned to Job Object");
Ok(())
}
}
fn send_shutdown_signal(&self, pid: u32) -> Result<(), io::Error> {
unsafe {
if GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid) == FALSE {
return Err(io::Error::last_os_error());
}
}
debug!(pid, "Sent CTRL_BREAK_EVENT");
Ok(())
}
fn send_kill_signal(&self, pid: u32) -> Result<(), io::Error> {
unsafe {
let handle = OpenProcess(PROCESS_ALL_ACCESS, FALSE, pid);
if handle.is_null() {
let err = io::Error::last_os_error();
// ERROR_INVALID_PARAMETER (87) means process already exited
if err.raw_os_error() == Some(87) {
return Ok(());
}
return Err(err);
}
let result = TerminateProcess(handle, 1);
CloseHandle(handle);
if result == FALSE {
let err = io::Error::last_os_error();
// ERROR_ACCESS_DENIED (5) — process may have already exited
if err.raw_os_error() == Some(5) {
return Ok(());
}
return Err(err);
}
}
debug!(pid, "Sent TerminateProcess");
Ok(())
}
}
-149
View File
@@ -1,149 +0,0 @@
use dirigent_process::{create_manager, graceful_shutdown_sync};
use std::process::Command;
use std::time::Duration;
/// Build a long-running command that does not require a TTY on any platform.
///
/// On Windows, `timeout /t N /nobreak` fails when stdin is a pipe (no console),
/// so we use `ping -n N 127.0.0.1` which sleeps approximately N-1 seconds with
/// no TTY requirement.
///
/// On Unix, `sleep N` is the idiomatic choice.
#[cfg(windows)]
fn long_sleep_cmd(seconds: u32) -> Command {
let mut cmd = Command::new("ping");
cmd.args(["-n", &seconds.to_string(), "127.0.0.1"]);
cmd
}
#[cfg(unix)]
fn long_sleep_cmd(seconds: u32) -> Command {
let mut cmd = Command::new("sleep");
cmd.arg(seconds.to_string());
cmd
}
#[cfg(all(windows, feature = "tokio"))]
fn long_sleep_async_cmd(seconds: u32) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new("ping");
cmd.args(["-n", &seconds.to_string(), "127.0.0.1"]);
cmd
}
#[cfg(all(unix, feature = "tokio"))]
fn long_sleep_async_cmd(seconds: u32) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new("sleep");
cmd.arg(seconds.to_string());
cmd
}
#[test]
fn test_manager_init() {
let mgr = create_manager();
mgr.init().expect("init should succeed");
// Double init should also succeed (idempotent)
mgr.init().expect("double init should succeed");
}
#[test]
fn test_create_lifecycle() {
let mgr = create_manager();
mgr.init().expect("init failed");
let _lifecycle = mgr.create_lifecycle();
}
#[test]
fn test_configure_and_spawn() {
let mgr = create_manager();
mgr.init().expect("init failed");
let lifecycle = mgr.create_lifecycle();
let mut cmd = long_sleep_cmd(30);
lifecycle.configure_command(&mut cmd);
let mut child = cmd.spawn().expect("spawn failed");
let pid = child.id();
assert!(pid > 0);
// Register should succeed
lifecycle.register_child(pid).expect("register failed");
// Process should still be running
assert!(child.try_wait().expect("try_wait failed").is_none());
// Clean up
let _ = child.kill();
let _ = child.wait();
}
#[test]
fn test_graceful_shutdown_sync() {
let mgr = create_manager();
mgr.init().expect("init failed");
let lifecycle = mgr.create_lifecycle();
let mut cmd = long_sleep_cmd(60);
lifecycle.configure_command(&mut cmd);
let mut child = cmd.spawn().expect("spawn failed");
let pid = child.id();
lifecycle.register_child(pid).expect("register failed");
// Graceful shutdown with 3s timeout — process won't exit voluntarily,
// so it should be force-killed after timeout
let exited_gracefully = graceful_shutdown_sync(
lifecycle.as_ref(),
&mut child,
Duration::from_secs(3),
);
// Process should be dead now
assert!(child.try_wait().expect("try_wait failed").is_some());
// It was force-killed (ping/sleep don't handle SIGTERM/CTRL_BREAK)
assert!(!exited_gracefully);
}
#[test]
fn test_send_kill_signal() {
let mgr = create_manager();
mgr.init().expect("init failed");
let lifecycle = mgr.create_lifecycle();
let mut cmd = long_sleep_cmd(60);
lifecycle.configure_command(&mut cmd);
let mut child = cmd.spawn().expect("spawn failed");
let pid = child.id();
lifecycle.register_child(pid).expect("register failed");
// Direct kill signal
lifecycle.send_kill_signal(pid).expect("kill failed");
// Wait for process to die
let status = child.wait().expect("wait failed");
assert!(!status.success());
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_async_graceful_shutdown() {
use dirigent_process::graceful_shutdown_async;
let mgr = create_manager();
mgr.init().expect("init failed");
let lifecycle = mgr.create_lifecycle();
let mut cmd = long_sleep_async_cmd(60);
lifecycle.configure_async_command(&mut cmd);
let mut child = cmd.spawn().expect("spawn failed");
let pid = child.id().expect("no pid");
lifecycle.register_child(pid).expect("register failed");
let exited_gracefully = graceful_shutdown_async(
lifecycle.as_ref(),
&mut child,
Duration::from_secs(3),
)
.await;
assert!(child.try_wait().expect("try_wait failed").is_some());
assert!(!exited_gracefully);
}