sync from monorepo @ ffee08f2

This commit is contained in:
2026-05-09 19:52:44 +02:00
parent e20542a40e
commit bf5a79d931
177 changed files with 242 additions and 37736 deletions
+1 -18
View File
@@ -19,7 +19,7 @@ agent-client-protocol = { version = "0.6", optional = true }
# Async streams
async-stream = { version = "0.3", optional = true }
# Async trait support
async-trait = { version = "0.1", optional = true }
async-trait = "0.1"
# Web server
axum = { version = "0.8", optional = true }
# Base64 encoding for embedded resources
@@ -29,13 +29,9 @@ blake3 = { version = "1.5", optional = true }
chrono = { version = "0.4", features = ["serde"] }
dirigent_acp_api = { path = "../dirigent_acp_api", optional = true }
# Workspace dependencies
dirigent_archivist = { path = "../dirigent_archivist", optional = true }
dirigent_config = { path = "../dirigent_config", optional = true }
dirigent_auth = { path = "../dirigent_auth" }
dirigent_process = { path = "../dirigent_process", features = ["tokio"], optional = true }
dirigent_taskrunner = { path = "../dirigent_taskrunner", optional = true }
dirigent_matrix = { path = "../dirigent_matrix", optional = true }
dirigent_zed = { path = "../dirigent_zed", optional = true }
dirigent_inspector = { path = "../dirigent_inspector", optional = true }
dirigent_protocol = { path = "../dirigent_protocol", features = ["adapters"], optional = true }
dirigent_tools = { path = "../dirigent_tools", optional = true }
@@ -77,26 +73,16 @@ toml = "0.8"
name = "stream_registry_test"
required-features = ["test-utils"]
[[test]]
name = "replay_test"
required-features = ["test-utils", "server"]
[[test]]
name = "matrix_migration_test"
required-features = ["server"]
[features]
default = []
test-utils = []
server = [
"dep:agent-client-protocol",
"dep:async-stream",
"dep:async-trait",
"dep:axum",
"dep:base64",
"dep:blake3",
"dep:dirigent_acp_api",
"dep:dirigent_archivist",
"dep:dirigent_config",
"dep:dirigent_inspector",
"dep:dirigent_protocol",
@@ -113,8 +99,5 @@ server = [
"dep:tower-http",
"dep:tracing",
"dep:tracing-subscriber",
"dep:dirigent_matrix",
"dep:dirigent_zed",
"dep:dirigent_taskrunner",
"dep:dirigent_process",
]
+8 -19
View File
@@ -57,15 +57,10 @@ pub struct CoreConfig {
/// Archive backend declarations. Phase 3+ replaces `archive_root` with
/// this `[[archives]]`-array config. When both are set, `archives` wins.
///
/// Stored as typed `ArchiveConfig` on server builds and as raw
/// `serde_json::Value` on WASM/non-server builds (archivist types pull in
/// the full coordinator which is server-only).
#[cfg(feature = "server")]
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub archives: Vec<dirigent_archivist::registry::ArchiveConfig>,
/// Archive backend declarations (opaque on non-server builds).
#[cfg(not(feature = "server"))]
/// Stored as opaque `serde_json::Value` — the archivist (in
/// `dirigent_server`) deserialises these into typed `ArchiveConfig`
/// at boot. Core only needs to preserve the raw entries for
/// serialization round-trips.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub archives: Vec<serde_json::Value>,
@@ -118,14 +113,14 @@ pub struct CoreConfig {
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub accounts: HashMap<String, dirigent_auth::Account>,
/// Matrix sharing behavior configuration.
/// Matrix sharing behavior configuration (opaque).
///
/// References an account by name for identity/credentials.
/// If None, Matrix session sharing is disabled.
///
/// Stored as typed `MatrixBehaviorConfig` on server builds and as
/// raw `serde_json::Value` on WASM/non-server builds to avoid pulling
/// in the heavy `matrix-sdk` dependency.
/// Stored as raw `serde_json::Value` so `dirigent_core` does not depend
/// on `dirigent_matrix`. The server layer (`dirigent_server`) deserializes
/// this into `dirigent_matrix::MatrixBehaviorConfig` when needed.
///
/// Example in dirigent.toml:
/// ```toml
@@ -134,12 +129,6 @@ pub struct CoreConfig {
/// default_invite = ["@user:example.com"]
/// store_path = "matrix/bot/store"
/// ```
#[cfg(feature = "server")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub matrix: Option<dirigent_matrix::MatrixBehaviorConfig>,
/// Matrix sharing behavior configuration (opaque on non-server builds).
#[cfg(not(feature = "server"))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub matrix: Option<serde_json::Value>,
+56
View File
@@ -0,0 +1,56 @@
use crate::types::ConnectorKind;
use uuid::Uuid;
/// Result of registering a connector with external services.
#[derive(Debug, Default)]
pub struct ConnectorRegistration {
/// An external UID assigned by a service (e.g., inspector or archivist).
pub external_uid: Option<Uuid>,
}
/// Lifecycle hooks invoked by the core runtime when connectors are created or removed.
///
/// Implementations wire up external services (archivist, inspector, etc.)
/// without the core needing to know about them directly.
#[async_trait::async_trait]
pub trait ConnectorLifecycleHooks: Send + Sync {
async fn on_connector_created(
&self,
connector_id: &str,
kind: ConnectorKind,
title: &str,
owner: &str,
params: &serde_json::Value,
) -> ConnectorRegistration;
async fn on_connector_removed(&self, _connector_id: &str) {}
#[cfg(feature = "server")]
fn inspector(&self) -> Option<std::sync::Arc<dirigent_inspector::InspectorRegistry>> {
None
}
#[cfg(feature = "server")]
fn process_manager(
&self,
) -> Option<std::sync::Arc<dyn dirigent_process::ProcessGroupManager>> {
None
}
}
/// No-op implementation for environments that don't need lifecycle hooks.
pub struct NoOpHooks;
#[async_trait::async_trait]
impl ConnectorLifecycleHooks for NoOpHooks {
async fn on_connector_created(
&self,
_connector_id: &str,
_kind: ConnectorKind,
_title: &str,
_owner: &str,
_params: &serde_json::Value,
) -> ConnectorRegistration {
ConnectorRegistration::default()
}
}
+4 -4
View File
@@ -38,6 +38,10 @@ pub mod types;
// Plugin system types (scaffolding) - always available
pub mod plugins;
// Lifecycle hooks for connector creation/removal - always available (WASM-compatible)
pub mod hooks;
pub use hooks::{ConnectorLifecycleHooks, ConnectorRegistration, NoOpHooks};
// Tool directive and configuration types - always available (WASM-compatible)
pub mod tools;
@@ -58,10 +62,6 @@ pub use error::CoreError;
#[cfg(feature = "server")]
pub use runtime::{CoreHandle, CoreRuntime};
// Re-export Zed agent → ConnectorConfig conversion (used by API)
#[cfg(feature = "server")]
pub use runtime::zed_detection::{refresh_zed_connector_binaries, zed_agent_to_connector_config};
// Configuration module (server-only)
#[cfg(feature = "server")]
pub mod config;
File diff suppressed because it is too large Load Diff
@@ -1,589 +0,0 @@
//! Zed editor agent detection and connector config generation.
//!
//! This module detects Zed editor installations and generates `ConnectorConfig`
//! entries for discovered ACP agents. It is called during runtime initialization
//! to auto-populate connectors from Zed's agent configuration.
use crate::config::{ConnectorConfig, CoreConfig};
use crate::connectors::acp::config::ConnectorAgentType;
use crate::connectors::acp::{AcpConfig, TransportKind};
use crate::types::ConnectorKind;
use dirigent_tools::EmbeddingConfig;
use tracing::{debug, info, warn};
/// Default supported features for known ACP agent types.
///
/// These are conservative defaults based on confirmed agent capabilities.
/// Users can override via the connector config UI.
fn default_features_for_agent(agent_type: &ConnectorAgentType) -> Vec<String> {
match agent_type {
ConnectorAgentType::Claude => vec![
"cancellation".to_string(),
"session_resume".to_string(),
"session_list".to_string(),
],
ConnectorAgentType::Codex => vec![
"session_resume".to_string(),
],
// Gemini: no confirmed features yet (hangs on connect — BUG-7)
ConnectorAgentType::Gemini => vec![],
ConnectorAgentType::Custom => vec![],
}
}
/// Convert a discovered Zed agent into a `ConnectorConfig` for the runtime.
///
/// Only creates connectors for agents with resolved binary paths (typically
/// registry agents whose binaries have been downloaded by Zed).
///
/// Returns `None` if the agent has no binary path.
pub fn zed_agent_to_connector_config(agent: &dirigent_zed::ZedAgent) -> Option<ConnectorConfig> {
let binary_path = agent.binary_path.as_ref()?;
// Map agent names to proper types. Handles both Zed settings keys
// (e.g. "claude-acp") and external_agents directory names (e.g. "claude-agent-acp").
let name_lower = agent.name.to_lowercase();
let (default_title, default_icon, agent_type): (&str, &str, ConnectorAgentType) =
if name_lower.contains("claude") {
("Claude (Zed)", "claude", ConnectorAgentType::Claude)
} else if name_lower.contains("codex") {
("Codex (Zed)", "codex", ConnectorAgentType::Codex)
} else if name_lower.contains("gemini") {
("Gemini (Zed)", "gemini", ConnectorAgentType::Gemini)
} else {
(agent.name.as_str(), "acp", ConnectorAgentType::Custom)
};
// Use registry display name with "(Zed)" suffix when available, falling
// back to the hardcoded title.
let title: String = match agent.display_name.as_deref() {
Some(display) => format!("{display} (Zed)"),
None => default_title.to_string(),
};
// Use the locally cached SVG icon path from the registry when available,
// otherwise fall back to the built-in icon name.
let icon: String = match agent.icon_local_path.as_ref() {
Some(path) => path.to_string_lossy().to_string(),
None => default_icon.to_string(),
};
let features = default_features_for_agent(&agent_type);
// Build a proper AcpConfig with stdio transport pointing to the Zed-managed binary.
let env: Vec<(String, String)> = agent
.env_overrides
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
// Use args from registry metadata (e.g. ["--acp"]) when available.
let args = if agent.args.is_empty() {
vec![]
} else {
agent.args.clone()
};
let acp_config = AcpConfig {
transport: TransportKind::Stdio {
command: binary_path.to_string_lossy().to_string(),
args,
cwd: None,
env,
},
protocol_version: 1,
cwd: ".".to_string(),
retry: Default::default(),
embedding: EmbeddingConfig::default(),
default_ownership: Default::default(),
acp_log_dir: None,
agent_type,
};
let params = match serde_json::to_value(&acp_config) {
Ok(v) => v,
Err(e) => {
warn!(
agent = %agent.name,
error = %e,
"Failed to serialize AcpConfig for Zed agent"
);
return None;
}
};
Some(ConnectorConfig {
id: None,
kind: ConnectorKind::Acp,
owner: None,
title: Some(title),
working_directory: None,
params,
icon_path: Some(icon),
show_type_overlay: false,
supported_features: features,
tool_configuration: None,
plugin_assignments: vec![],
use_in_new_projects: true,
source: None,
zed_agent_name: Some(agent.name.clone()),
})
}
/// Refresh binary paths for Zed-sourced connectors in the config.
///
/// When Zed upgrades agent binaries in the background, the binary path changes
/// (e.g. new version directory). This function re-detects the current binary
/// paths from Zed installations and updates any connector that has a
/// `zed_agent_name` set.
///
/// Returns the number of connectors updated.
pub fn refresh_zed_connector_binaries(config: &mut CoreConfig) -> usize {
let installations = dirigent_zed::detect_installations();
if installations.is_empty() {
debug!("No Zed installations detected, skipping binary refresh");
return 0;
}
// Build a map of agent_name -> latest binary path from all Zed installations.
let mut agent_binaries: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for installation in &installations {
for agent in &installation.agents {
if let Some(ref binary_path) = agent.binary_path {
agent_binaries
.insert(agent.name.clone(), binary_path.to_string_lossy().to_string());
}
}
}
let mut updated = 0usize;
for connector in &mut config.connectors {
let zed_name = match connector.zed_agent_name.as_deref() {
Some(n) => n,
None => continue,
};
let new_binary = match agent_binaries.get(zed_name) {
Some(b) => b.clone(),
None => continue,
};
// Parse current ACP config to check the existing binary path.
let mut acp_config: AcpConfig = match serde_json::from_value(connector.params.clone()) {
Ok(c) => c,
Err(_) => continue,
};
let current_command = match &acp_config.transport {
TransportKind::Stdio { command, .. } => command.clone(),
_ => continue,
};
if current_command == new_binary {
continue;
}
// Update the transport command to the new binary path.
match &mut acp_config.transport {
TransportKind::Stdio { command, .. } => {
info!(
zed_agent = %zed_name,
old = %current_command,
new = %new_binary,
"Updating Zed connector binary path"
);
*command = new_binary;
}
_ => continue,
}
// Re-serialize back into params.
match serde_json::to_value(&acp_config) {
Ok(v) => {
connector.params = v;
updated += 1;
}
Err(e) => {
warn!(
zed_agent = %zed_name,
error = %e,
"Failed to re-serialize AcpConfig after binary update"
);
}
}
}
if updated > 0 {
info!(count = updated, "Updated Zed connector binary paths");
}
updated
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CoreConfig;
#[test]
fn test_zed_agent_to_connector_config_no_binary() {
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: None,
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
assert!(zed_agent_to_connector_config(&agent).is_none());
}
#[test]
fn test_zed_agent_to_connector_config_with_binary() {
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/usr/local/bin/claude-acp")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.kind, ConnectorKind::Acp);
assert_eq!(config.title.as_deref(), Some("Claude (Zed)"));
assert_eq!(config.icon_path.as_deref(), Some("claude"));
assert_eq!(config.source, None);
// Verify the params contain a proper AcpConfig
let acp_config: AcpConfig = serde_json::from_value(config.params).unwrap();
match &acp_config.transport {
TransportKind::Stdio { command, .. } => {
assert_eq!(command, "/usr/local/bin/claude-acp");
}
_ => panic!("Expected stdio transport"),
}
assert_eq!(acp_config.agent_type, ConnectorAgentType::Claude);
assert_eq!(config.supported_features, vec!["cancellation", "session_resume", "session_list"]);
}
#[test]
fn test_zed_agent_to_connector_config_with_env() {
let mut env = std::collections::HashMap::new();
env.insert(
"CLAUDE_CODE_EXECUTABLE".to_string(),
"/usr/bin/claude".to_string(),
);
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/binary")),
env_overrides: env,
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
let acp_config: AcpConfig = serde_json::from_value(config.params).unwrap();
match &acp_config.transport {
TransportKind::Stdio { env, .. } => {
assert!(env
.iter()
.any(|(k, v)| k == "CLAUDE_CODE_EXECUTABLE" && v == "/usr/bin/claude"));
}
_ => panic!("Expected stdio transport"),
}
}
#[test]
fn test_zed_agent_to_connector_config_unknown_agent() {
let agent = dirigent_zed::ZedAgent {
name: "my-custom-agent".to_string(),
agent_type: dirigent_zed::AgentServerType::Custom,
binary_path: Some(std::path::PathBuf::from("/path/to/custom")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.title.as_deref(), Some("my-custom-agent"));
assert_eq!(config.icon_path.as_deref(), Some("acp"));
let acp_config: AcpConfig = serde_json::from_value(config.params).unwrap();
assert_eq!(acp_config.agent_type, ConnectorAgentType::Custom);
assert!(config.supported_features.is_empty());
}
#[test]
fn test_zed_agent_codex() {
let agent = dirigent_zed::ZedAgent {
name: "codex-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/codex")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.title.as_deref(), Some("Codex (Zed)"));
assert_eq!(config.icon_path.as_deref(), Some("codex"));
let acp_config: AcpConfig = serde_json::from_value(config.params).unwrap();
assert_eq!(acp_config.agent_type, ConnectorAgentType::Codex);
assert_eq!(config.supported_features, vec!["session_resume"]);
}
#[test]
fn test_zed_agent_gemini() {
let agent = dirigent_zed::ZedAgent {
name: "gemini".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/gemini")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.title.as_deref(), Some("Gemini (Zed)"));
assert_eq!(config.icon_path.as_deref(), Some("gemini"));
let acp_config: AcpConfig = serde_json::from_value(config.params).unwrap();
assert_eq!(acp_config.agent_type, ConnectorAgentType::Gemini);
assert!(config.supported_features.is_empty());
}
#[test]
fn test_dismissed_zed_agent_title_matches_generated_config() {
// Verify that a dismissed title like "Claude (Zed)" matches the title
// generated by zed_agent_to_connector_config for a claude-acp agent
let mut core_config = CoreConfig::default();
core_config
.dismissed_zed_agents
.push("Claude (Zed)".to_string());
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/usr/local/bin/claude-acp")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let connector_config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(connector_config.title.as_deref(), Some("Claude (Zed)"));
// The dismissed list should contain the generated title
assert!(core_config
.dismissed_zed_agents
.contains(connector_config.title.as_ref().unwrap()));
}
#[test]
fn test_dismissed_list_does_not_block_other_agents() {
// Dismissing Claude should not block Codex or Gemini
let mut core_config = CoreConfig::default();
core_config
.dismissed_zed_agents
.push("Claude (Zed)".to_string());
let codex_title = "Codex (Zed)".to_string();
let gemini_title = "Gemini (Zed)".to_string();
assert!(!core_config.dismissed_zed_agents.contains(&codex_title));
assert!(!core_config.dismissed_zed_agents.contains(&gemini_title));
}
#[test]
fn test_dismissed_zed_agents_serde_roundtrip() {
// Verify dismissed_zed_agents survives serialization/deserialization
let mut core_config = CoreConfig::default();
core_config
.dismissed_zed_agents
.push("Claude (Zed)".to_string());
core_config
.dismissed_zed_agents
.push("Gemini (Zed)".to_string());
let json = serde_json::to_string(&core_config).unwrap();
let deserialized: CoreConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.dismissed_zed_agents.len(), 2);
assert!(deserialized
.dismissed_zed_agents
.contains(&"Claude (Zed)".to_string()));
assert!(deserialized
.dismissed_zed_agents
.contains(&"Gemini (Zed)".to_string()));
}
#[test]
fn test_dismissed_zed_agents_empty_not_serialized() {
// With skip_serializing_if = "Vec::is_empty", empty list should not appear in JSON
let core_config = CoreConfig::default();
let json = serde_json::to_string(&core_config).unwrap();
assert!(
!json.contains("dismissed_zed_agents"),
"Empty dismissed_zed_agents should be omitted from serialization"
);
}
#[test]
fn test_dismissed_zed_agents_deserialized_from_missing_field() {
// Old config files without dismissed_zed_agents should still deserialize
// thanks to #[serde(default)]
let json = r#"{"project_dir":".","connectors":[]}"#;
let config: CoreConfig = serde_json::from_str(json).unwrap();
assert!(config.dismissed_zed_agents.is_empty());
}
#[test]
fn test_zed_agent_name_is_set() {
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/usr/local/bin/claude-acp")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.zed_agent_name.as_deref(), Some("claude-acp"));
}
#[test]
fn test_zed_agent_name_preserves_original_name() {
// The zed_agent_name should be the exact Zed agent name, not the display title
let agent = dirigent_zed::ZedAgent {
name: "claude-agent-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/binary")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.title.as_deref(), Some("Claude (Zed)"));
assert_eq!(config.zed_agent_name.as_deref(), Some("claude-agent-acp"));
}
#[test]
fn test_zed_agent_name_serde_roundtrip() {
let agent = dirigent_zed::ZedAgent {
name: "codex".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/codex")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"zed_agent_name\":\"codex\""));
let deserialized: ConnectorConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.zed_agent_name.as_deref(), Some("codex"));
}
#[test]
fn test_non_zed_connector_has_no_zed_agent_name() {
// ConnectorConfig created via template should not have zed_agent_name
let config = ConnectorConfig::default();
assert!(config.zed_agent_name.is_none());
}
#[test]
fn test_enriched_display_name_used_in_title() {
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/claude")),
env_overrides: std::collections::HashMap::new(),
display_name: Some("Claude Agent".to_string()),
description: Some("ACP wrapper for Anthropic's Claude".to_string()),
args: Vec::new(),
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
// Title should use the registry display name with "(Zed)" suffix.
assert_eq!(config.title.as_deref(), Some("Claude Agent (Zed)"));
}
#[test]
fn test_enriched_args_passed_to_transport() {
let agent = dirigent_zed::ZedAgent {
name: "auggie".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/auggie")),
env_overrides: std::collections::HashMap::new(),
display_name: Some("Auggie CLI".to_string()),
description: None,
args: vec!["--acp".to_string()],
icon_local_path: None,
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
let acp_config: AcpConfig = serde_json::from_value(config.params).unwrap();
match &acp_config.transport {
TransportKind::Stdio { args, .. } => {
assert_eq!(args, &["--acp"]);
}
_ => panic!("Expected stdio transport"),
}
}
#[test]
fn test_enriched_icon_path_used() {
let agent = dirigent_zed::ZedAgent {
name: "claude-acp".to_string(),
agent_type: dirigent_zed::AgentServerType::Registry,
binary_path: Some(std::path::PathBuf::from("/path/to/claude")),
env_overrides: std::collections::HashMap::new(),
display_name: None,
description: None,
args: Vec::new(),
icon_local_path: Some(std::path::PathBuf::from("/icons/claude-acp.svg")),
icon_url: None,
};
let config = zed_agent_to_connector_config(&agent).unwrap();
assert_eq!(config.icon_path.as_deref(), Some("/icons/claude-acp.svg"));
}
}
+11 -4
View File
@@ -1,10 +1,17 @@
//! Consecutive-failure health drift for streams (K=5 threshold).
//!
//! Mirrors the archivist's drift logic but tracks a single stream's
//! outcomes. Re-exports the shared `HealthStatus` enum from the
//! archivist's backend module to avoid duplication.
//! Tracks a single stream's health based on delivery outcomes.
pub use dirigent_archivist::backend::HealthStatus;
/// Health status of a stream, tracking consecutive delivery failures.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
/// Stream is delivering events successfully.
Healthy,
/// Stream has experienced failures but is still below the threshold.
Degraded { reason: String },
/// Stream has exceeded the failure threshold and is considered offline.
Unavailable { reason: String },
}
/// Number of consecutive failures before a stream drifts from `Degraded`
/// to `Unavailable`. Matches the archivist's backend drift threshold.
-217
View File
@@ -1,217 +0,0 @@
//! `MatrixFactory`: build a Matrix [`SessionStream`] from a `[[streams]]`
//! config block.
//!
//! This is the first stream-side factory wired for the Phase 4 migration
//! (Task 18). The factory lives in `dirigent_core` rather than
//! `dirigent_matrix` because `StreamFactory` is defined here and
//! `dirigent_core` already depends on `dirigent_matrix` — putting it in
//! `dirigent_matrix` would create a cycle.
//!
//! ## Scope
//!
//! The factory's responsibility is narrow: parse `cfg.params`, resolve
//! the target Matrix room via a running `MatrixService`, and construct a
//! `MatrixSessionShare` configured for the stream path (no legacy
//! forwarder task). Command-proxy wiring (Matrix → Dirigent
//! `ConnectorCommand::SendMessage`) remains in
//! `CoreRuntime::create_matrix_share` for now; a follow-up will extend
//! the factory to cover that path.
//!
//! ## Config shape
//!
//! ```toml
//! [[streams]]
//! name = "matrix-main"
//! type = "matrix"
//!
//! [streams.scope]
//! kind = "session"
//! scroll_id = "01985d00-..."
//!
//! [streams.params]
//! connector_id = "opencode-1" # dirigent connector key
//! session_id = "native-abc123" # native connector session id
//! room_id = "!abc:matrix.org" # pre-existing room to attach to
//! homeserver_url = "https://matrix.org" # informational (service already knows)
//! ```
use std::sync::Arc;
use async_trait::async_trait;
use serde::Deserialize;
use dirigent_protocol::streaming::{SessionStream, StreamScope};
use super::config::StreamConfig;
use super::factory::{StreamBuildError, StreamFactory};
/// Stream-side factory for Matrix. See module docs for the expected
/// TOML shape.
pub struct MatrixFactory {
service: Arc<dirigent_matrix::MatrixService>,
}
impl MatrixFactory {
/// Build a factory bound to a running `MatrixService`. The service
/// is expected to be logged in and sync-started by the time
/// `build()` is called; if it isn't, `build()` returns
/// `StreamBuildError::Transport`.
pub fn new(service: Arc<dirigent_matrix::MatrixService>) -> Self {
Self { service }
}
}
#[derive(Debug, Deserialize)]
struct MatrixStreamParams {
/// Dirigent connector id that owns the session being bridged.
connector_id: String,
/// Native connector session id.
session_id: String,
/// Matrix room id — must be a pre-existing room the bot can access.
/// Room creation is still handled by
/// `CoreRuntime::create_matrix_share` until the factory path is
/// expanded to cover it.
room_id: String,
/// Informational; the logged-in `MatrixService` is the authority on
/// which homeserver to talk to. Accepted so configs can be
/// self-documenting and round-trip through TOML.
#[serde(default)]
#[allow(dead_code)]
homeserver_url: Option<String>,
}
#[async_trait]
impl StreamFactory for MatrixFactory {
fn kind(&self) -> &'static str {
"matrix"
}
async fn build(
&self,
cfg: &StreamConfig,
) -> Result<Arc<dyn SessionStream>, StreamBuildError> {
// Scope must be Session; Matrix shares are intrinsically
// per-session bi-directional bridges.
let scroll_id = match &cfg.scope {
StreamScope::Session { scroll_id } => *scroll_id,
other => {
return Err(StreamBuildError::Config(format!(
"matrix stream requires scope.kind = \"session\", got {:?}",
other
)));
}
};
// Parse type-specific params.
let params: MatrixStreamParams = cfg
.params
.clone()
.try_into()
.map_err(|e: toml::de::Error| {
StreamBuildError::Config(format!(
"matrix stream '{}': invalid params: {}",
cfg.name, e
))
})?;
// Look up the room via the service. We intentionally don't
// create or join rooms here — the room must already exist.
// Creation remains the responsibility of
// `CoreRuntime::create_matrix_share`.
let room = match self.service.room_by_id(&params.room_id).await {
Ok(Some(room)) => room,
Ok(None) => {
return Err(StreamBuildError::Transport(format!(
"matrix stream '{}': room '{}' not found on client \
— ensure the bot has joined it",
cfg.name, params.room_id
)));
}
Err(dirigent_matrix::MatrixError::NotLoggedIn) => {
return Err(StreamBuildError::Transport(
"matrix service is not logged in; cannot build stream"
.to_string(),
));
}
Err(dirigent_matrix::MatrixError::Config(msg)) => {
return Err(StreamBuildError::Config(format!(
"matrix stream '{}': {}",
cfg.name, msg
)));
}
Err(other) => {
return Err(StreamBuildError::Transport(format!(
"matrix stream '{}': {}",
cfg.name, other
)));
}
};
// Construct the share for the stream path (no legacy forwarder
// task). We drop the command receiver on the floor here — the
// Matrix → Dirigent direction is not covered by this factory
// yet; see the follow-up TODO in the module docs.
let (share, _command_rx) = dirigent_matrix::MatrixSessionShare::new_for_stream(
params.connector_id,
params.session_id,
scroll_id,
params.room_id,
room,
);
Ok(Arc::new(share) as Arc<dyn SessionStream>)
}
}
// ─── Tests ───────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn factory_kind_is_matrix() {
// The factory's `kind()` is static and doesn't require a running
// MatrixService to read — covered by a minimal construction
// check in the integration test suite.
fn assert_is_factory<F: StreamFactory>(_: &F) {}
// We can't easily build a MatrixService in a unit test (it needs
// an Account + data dir + SQLite store). The full smoke test
// lives in `packages/dirigent_matrix/tests/factory_test.rs` and
// the cross-crate registry test in
// `packages/dirigent_core/tests/matrix_migration_test.rs`.
//
// This module-local test exists only to assert that the impl
// block type-checks against the `StreamFactory` trait bound.
fn _compile_check(f: &MatrixFactory) {
assert_is_factory(f);
}
}
#[test]
fn matrix_stream_params_deserialise_ok() {
let toml_str = r#"
connector_id = "opencode-1"
session_id = "native-abc"
room_id = "!foo:example.com"
homeserver_url = "https://matrix.org"
"#;
let p: MatrixStreamParams = toml::from_str(toml_str).expect("parse");
assert_eq!(p.connector_id, "opencode-1");
assert_eq!(p.session_id, "native-abc");
assert_eq!(p.room_id, "!foo:example.com");
assert_eq!(p.homeserver_url.as_deref(), Some("https://matrix.org"));
}
#[test]
fn matrix_stream_params_reject_missing_required() {
// Missing room_id should fail.
let toml_str = r#"
connector_id = "opencode-1"
session_id = "native-abc"
"#;
let err: Result<MatrixStreamParams, _> = toml::from_str(toml_str);
assert!(err.is_err());
}
}
+3 -7
View File
@@ -1,23 +1,19 @@
//! SharingBus, StreamRegistry, and replay. See docs/plans/2026-04-21-archivist-phase4-design.md.
//! SharingBus, StreamRegistry, and stream health. See docs/plans/2026-04-21-archivist-phase4-design.md.
//!
//! Replay functionality has moved to `dirigent_server::replay`.
pub mod bus;
pub mod config;
pub mod factory;
pub mod health;
#[cfg(feature = "server")]
pub mod matrix;
#[cfg(any(test, feature = "test-utils"))]
pub mod mock;
pub mod registry;
pub mod replay;
pub use bus::{BusReceiver, SharingBus};
pub use config::{StreamConfig, StreamsConfig};
pub use factory::{StreamBuildError, StreamFactory, StreamFactoryRegistry};
pub use health::HealthStatus;
#[cfg(feature = "server")]
pub use matrix::MatrixFactory;
pub use registry::{StreamId, StreamInfo, StreamRegistration, StreamRegistry};
pub use replay::{ReplayError, ReplayOptions, ReplayReport, ReplaySpeed};
#[cfg(any(test, feature = "test-utils"))]
pub use mock::MockStream;
-226
View File
@@ -1,226 +0,0 @@
//! Replay: reads a session from the archive and dispatches synthetic
//! `BusEvent`s with `EventOrigin::Replay` directly to a target stream,
//! bypassing the `SharingBus`.
//!
//! Consumed by `CoreRuntime::replay_session_to_stream` (task 16). This
//! module intentionally exposes a free function that takes
//! `&Archivist`, `scroll_id`, `Arc<dyn SessionStream>`, and `ReplayOptions`
//! so it can be unit-tested without a full runtime.
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
use dirigent_archivist::coordinator::Archivist;
use dirigent_archivist::error::ArchivistError;
use dirigent_archivist::types::MessageRecord;
use dirigent_protocol::{
Event, Message, MessagePart, MessageRole, MessageStatus,
streaming::{BusEvent, EventOrigin, EventRouting, SessionStream, StreamOutcome},
};
/// Options controlling a replay pass.
#[derive(Debug, Clone)]
pub struct ReplayOptions {
/// When true and the session is an AcpConnection, meta-events are read from
/// the archive (currently only counted — rendering meta events as
/// `BusEvent`s is out of scope for Phase 4).
pub include_meta_events: bool,
/// Pace events in real time (sleep between consecutive timestamps) or emit
/// as fast as the target stream can consume.
pub speed: ReplaySpeed,
}
impl Default for ReplayOptions {
fn default() -> Self {
Self {
include_meta_events: false,
speed: ReplaySpeed::AsFastAsPossible,
}
}
}
/// Controls inter-event pacing during replay.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplaySpeed {
/// Sleep the wall-clock delta between consecutive message timestamps.
Realtime,
/// Emit events as fast as the stream can consume.
AsFastAsPossible,
}
/// Outcome of a replay pass.
#[derive(Debug, Default, Clone)]
pub struct ReplayReport {
/// Total events dispatched to the stream (includes failed attempts).
pub events_sent: usize,
/// Events the stream rejected (`StreamOutcome::Failed`).
pub failures: usize,
/// Wall-clock duration of the replay in milliseconds.
pub duration_ms: u64,
}
/// Errors raised by `replay_session_to_stream` itself. Stream-side failures are
/// counted in `ReplayReport::failures` rather than propagated, so one bad event
/// doesn't abort the replay.
#[derive(Debug, thiserror::Error)]
pub enum ReplayError {
/// The archive has no session with the given scroll id.
#[error("session not found: {0}")]
SessionNotFound(Uuid),
/// Archivist returned a non-SessionUnknown error (I/O, decoding, etc).
#[error("archivist: {0}")]
Archivist(String),
}
/// Replay a session's archived messages to a single `SessionStream`.
///
/// Reads metadata + messages from `archivist`, synthesises a `BusEvent` per
/// message with `EventOrigin::Replay { replay_id }`, and dispatches directly
/// to the target stream. The `SharingBus` is not involved; live events remain
/// unaffected.
///
/// The function continues on stream failures and records the count in the
/// returned `ReplayReport`; only unrecoverable archive errors propagate.
pub async fn replay_session_to_stream(
archivist: &Archivist,
scroll_id: Uuid,
stream: Arc<dyn SessionStream>,
opts: ReplayOptions,
) -> Result<ReplayReport, ReplayError> {
let start = std::time::Instant::now();
let replay_id = Uuid::new_v4();
// Load metadata. Translate the archivist's typed `SessionUnknown` into
// the replay-level `SessionNotFound` variant; everything else becomes
// `Archivist(_)` so callers can distinguish "missing" from "broken".
let metadata = archivist
.get_session_metadata(scroll_id, None)
.await
.map_err(|e| match e {
ArchivistError::SessionUnknown(id) => ReplayError::SessionNotFound(id),
other => ReplayError::Archivist(other.to_string()),
})?;
let messages = archivist
.get_messages(scroll_id, None)
.await
.map_err(|e| ReplayError::Archivist(e.to_string()))?;
let connector_uid = Some(metadata.connector_uid);
let native_session_id = metadata.native_session_id.clone();
// We do not persist the orchestrator-side `connector_id` string in session
// metadata; the native session id is the best reversible handle we have.
let connector_id = native_session_id.clone().unwrap_or_default();
let mut events_sent = 0usize;
let mut failures = 0usize;
let mut prev_ts: Option<chrono::DateTime<chrono::Utc>> = None;
for record in messages {
if matches!(opts.speed, ReplaySpeed::Realtime) {
if let Some(prev) = prev_ts {
let delta = record.ts.signed_duration_since(prev);
if let Ok(d) = delta.to_std() {
// Cap per-step sleep at 1h to avoid pathological archives
// where a session sat idle for days.
if d > Duration::from_millis(0) && d < Duration::from_secs(3600) {
tokio::time::sleep(d).await;
}
}
}
prev_ts = Some(record.ts);
}
let message = message_from_record(&record, native_session_id.as_deref());
let event = Event::MessageCompleted {
connector_id: connector_id.clone(),
message,
};
let mut routing = EventRouting::derive(&event, connector_uid, &connector_id);
// `derive()` leaves scroll_id=None (the bus cache normally fills it in).
// During replay we have the authoritative scroll_id up front.
routing.scroll_id = Some(scroll_id);
let bus_event = BusEvent {
routing,
origin: EventOrigin::Replay { replay_id },
event: Arc::new(event),
};
match stream.on_event(&bus_event).await {
StreamOutcome::Ok | StreamOutcome::Skipped => {
events_sent += 1;
}
StreamOutcome::Failed(_err) => {
failures += 1;
events_sent += 1; // count attempted regardless
}
}
}
if opts.include_meta_events {
// Meta-events exist only on AcpConnection sessions; the read is
// cheap and idempotent, so we don't gate on `metadata.kind`. Render-
// as-BusEvent is out of scope for Phase 4 — we just probe the
// archive so missing meta-event storage surfaces as a log line
// here rather than later in the call chain.
let _ = archivist.get_meta_events(scroll_id, None).await;
}
Ok(ReplayReport {
events_sent,
failures,
duration_ms: start.elapsed().as_millis() as u64,
})
}
/// Synthesize a protocol `Message` from an archived `MessageRecord`.
///
/// The session_id we emit is the connector's native session id when known,
/// falling back to the stringified scroll_id so downstream routing at least
/// has a stable handle.
fn message_from_record(record: &MessageRecord, native_session_id: Option<&str>) -> Message {
Message {
id: record.message_id.to_string(),
session_id: native_session_id
.map(str::to_string)
.unwrap_or_else(|| record.session.to_string()),
role: parse_role(&record.role),
created_at: record.ts,
content: content_parts_from_record(record),
status: MessageStatus::Completed,
metadata: None,
}
}
/// Parse the archivist's stringly-typed role into the protocol enum.
///
/// `MessageRole` only has `User` and `Assistant` today; archived "system" /
/// "tool" rows (which the protocol layer does not support) fall back to
/// `User` rather than drop the message entirely. Lossy but preserves content.
fn parse_role(role: &str) -> MessageRole {
match role {
"assistant" => MessageRole::Assistant,
"user" => MessageRole::User,
// Protocol has no System/Tool variant; surface these as user messages
// so their content still reaches the stream.
_ => MessageRole::User,
}
}
/// Prefer the archived structured `content_parts` (round-trips tool calls,
/// code blocks, etc). Fall back to a single `Text` part built from the
/// markdown rendering when parts are missing or fail to parse.
fn content_parts_from_record(record: &MessageRecord) -> Vec<MessagePart> {
if let Some(parts) = &record.content_parts {
if let Ok(parsed) = serde_json::from_value::<Vec<MessagePart>>(parts.clone()) {
return parsed;
}
}
vec![MessagePart::Text {
text: record.content_md.clone(),
}]
}
@@ -1,207 +0,0 @@
//! Integration test: Matrix migration onto StreamRegistry (Phase 4, Task 18).
//!
//! Scope:
//! - `MatrixFactory::kind()` reports `"matrix"`.
//! - A fresh `StreamFactoryRegistry` with the factory registered can look it
//! up and rejects unknown kinds.
//! - Building a Matrix stream from a config with an `archive_wide` scope is
//! rejected with `StreamBuildError::Config`.
//! - Building a Matrix stream against a not-logged-in service is rejected
//! with `StreamBuildError::Transport` (does not panic, does not spin up
//! a real Matrix connection).
//!
//! This does NOT exercise end-to-end Matrix delivery — that requires a
//! live homeserver or a stub client, which is outside Task 18's scope.
//! The share-side `SessionStream` impl is covered separately by
//! `dirigent_matrix` unit tests.
#![cfg(feature = "server")]
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
use dirigent_auth::{Account, AccountKind, AccountProfile, SecretSource};
use dirigent_core::sharing::{
MatrixFactory, StreamBuildError, StreamConfig, StreamFactory, StreamFactoryRegistry,
};
use dirigent_matrix::{MatrixBehaviorConfig, MatrixService};
use dirigent_protocol::streaming::StreamScope;
// ─── Helpers ────────────────────────────────────────────────────────────────
fn sample_matrix_account() -> Account {
let mut credentials = HashMap::new();
credentials.insert(
"password".to_string(),
SecretSource::Inline {
value: "bot-pass".to_string(),
},
);
let mut properties = HashMap::new();
properties.insert(
"homeserver".to_string(),
serde_json::json!("https://matrix.example.com"),
);
properties.insert(
"device_id".to_string(),
serde_json::json!("DIRIGENT_TEST"),
);
Account {
kind: AccountKind::Matrix,
config_name: "matrix-test".to_string(),
user_id: None,
credentials,
profile: AccountProfile {
username: Some("bot".to_string()),
display_name: Some("Test Bot".to_string()),
..Default::default()
},
properties,
}
}
fn behavior() -> MatrixBehaviorConfig {
MatrixBehaviorConfig {
account: "matrix-test".to_string(),
mode: Default::default(),
default_invite: vec![],
store_path: "matrix/test/store".to_string(),
rooms: vec![],
}
}
/// Build a `MatrixService` without calling `login()`. Any code path that
/// needs a live Client will surface a clean error (not a panic).
fn not_logged_in_service() -> Arc<MatrixService> {
let account = sample_matrix_account();
let tmp = tempfile::tempdir().expect("tempdir");
let data_dir: PathBuf = tmp.path().to_path_buf();
// Leak the TempDir so the path survives the life of the service for
// the duration of the test. The sqlite store is only created when
// login() runs — we never call it in these tests.
std::mem::forget(tmp);
let service = MatrixService::from_account(&account, behavior(), data_dir)
.expect("from_account");
Arc::new(service)
}
// ─── Tests ──────────────────────────────────────────────────────────────────
#[test]
fn matrix_factory_kind_is_matrix() {
let service = not_logged_in_service();
let f = MatrixFactory::new(service);
assert_eq!(f.kind(), "matrix");
}
#[test]
fn registry_returns_registered_matrix_factory() {
let service = not_logged_in_service();
let reg = StreamFactoryRegistry::new().register(MatrixFactory::new(service));
assert!(reg.get("matrix").is_some(), "matrix factory should be found");
assert!(
reg.get("langfuse").is_none(),
"unregistered kinds must return None"
);
}
#[tokio::test]
async fn build_rejects_archive_wide_scope_with_config_error() {
let service = not_logged_in_service();
let factory = MatrixFactory::new(service);
let params_toml = r#"
connector_id = "opencode-1"
session_id = "native-abc"
room_id = "!room:example.com"
"#;
let params: toml::Value = toml::from_str(params_toml).unwrap();
let cfg = StreamConfig {
name: "matrix-wrong-scope".to_string(),
kind: "matrix".to_string(),
scope: StreamScope::ArchiveWide { acknowledged: false },
enabled: true,
params,
};
let err = factory.build(&cfg).await.err().expect("build should fail");
match err {
StreamBuildError::Config(msg) => {
assert!(
msg.contains("session"),
"expected 'session' hint in error, got: {msg}"
);
}
other => panic!("expected Config error, got {other:?}"),
}
}
#[tokio::test]
async fn build_rejects_missing_params_with_config_error() {
let service = not_logged_in_service();
let factory = MatrixFactory::new(service);
// Missing room_id — required field.
let params_toml = r#"
connector_id = "opencode-1"
session_id = "native-abc"
"#;
let params: toml::Value = toml::from_str(params_toml).unwrap();
let cfg = StreamConfig {
name: "matrix-missing-room".to_string(),
kind: "matrix".to_string(),
scope: StreamScope::Session {
scroll_id: Uuid::now_v7(),
},
enabled: true,
params,
};
let err = factory.build(&cfg).await.err().expect("build should fail");
assert!(
matches!(err, StreamBuildError::Config(_)),
"expected Config error, got {err:?}"
);
}
#[tokio::test]
async fn build_reports_transport_error_when_service_not_logged_in() {
let service = not_logged_in_service();
let factory = MatrixFactory::new(service);
let params_toml = r#"
connector_id = "opencode-1"
session_id = "native-abc"
room_id = "!room:example.com"
"#;
let params: toml::Value = toml::from_str(params_toml).unwrap();
let cfg = StreamConfig {
name: "matrix-not-logged-in".to_string(),
kind: "matrix".to_string(),
scope: StreamScope::Session {
scroll_id: Uuid::now_v7(),
},
enabled: true,
params,
};
let err = factory.build(&cfg).await.err().expect("build should fail");
match err {
StreamBuildError::Transport(msg) => {
assert!(
msg.to_lowercase().contains("logged in")
|| msg.to_lowercase().contains("matrix service"),
"expected transport error to mention login state, got: {msg}"
);
}
other => panic!("expected Transport error, got {other:?}"),
}
}
-176
View File
@@ -1,176 +0,0 @@
//! Integration test: replay archived session into a `MockStream`.
//!
//! Builds a single-backend in-memory (tempdir) archivist, registers a
//! session, appends 10 messages with ascending timestamps, then exercises
//! `replay_session_to_stream` end-to-end.
use std::sync::Arc;
use chrono::{Duration as ChronoDuration, Utc};
use uuid::Uuid;
use dirigent_archivist::{
Archivist, MessageRecord, RegisterConnectorRequest, RegisterSessionRequest,
backends::JsonlBackend,
};
use dirigent_core::sharing::{
MockStream,
replay::{ReplayOptions, ReplaySpeed, replay_session_to_stream},
};
use dirigent_protocol::streaming::{EventOrigin, SessionStream, StreamScope};
/// Build an in-memory-ish archivist backed by a tempdir + JsonlBackend.
///
/// Matches the pattern used by `dirigent_archivist/tests/integration_tests.rs`.
/// The tempdir is leaked for the duration of the test process — acceptable
/// because the test binary exits immediately after.
async fn build_in_memory_archivist() -> Arc<Archivist> {
let temp_dir = std::env::temp_dir().join(format!("core_replay_test_{}", Uuid::now_v7()));
let backend = Arc::new(
JsonlBackend::new(temp_dir.clone())
.await
.expect("JsonlBackend construction"),
);
let archivist = Archivist::from_single_backend("main".into(), backend)
.await
.expect("Archivist::from_single_backend");
Arc::new(archivist)
}
/// Register a fresh connector + session and append `n` messages with
/// timestamps one second apart. Returns the scroll_id.
async fn seed_session_with_messages(archivist: &Archivist, n: usize) -> Uuid {
let connector_resp = archivist
.register_connector(
RegisterConnectorRequest {
r#type: "OpenCode".to_string(),
title: "Replay Test Connector".to_string(),
client_native_id: format!("replay-test@{}", Uuid::now_v7()),
custom_uid: None,
metadata: serde_json::json!({}),
fingerprint: None,
},
None,
)
.await
.expect("register_connector");
let session_resp = archivist
.register_session(
RegisterSessionRequest {
connector_uid: connector_resp.connector_uid,
native_session_id: format!("native-{}", Uuid::now_v7()),
title: Some("Replay Test Session".to_string()),
custom_scroll_id: None,
metadata: serde_json::json!({}),
completeness: Default::default(),
parent_scroll_id: None,
is_subagent: false,
continuation: None,
agent_id: None,
subagent_type: None,
spawning_tool_use_id: None,
},
None,
)
.await
.expect("register_session");
let scroll_id = session_resp.scroll_id;
let base_ts = Utc::now();
let messages: Vec<MessageRecord> = (0..n)
.map(|i| {
let role = if i % 2 == 0 { "user" } else { "assistant" };
MessageRecord {
version: 1,
message_id: Uuid::now_v7(),
session: scroll_id,
parent_id: None,
ts: base_ts + ChronoDuration::seconds(i as i64),
role: role.to_string(),
author: None,
content_md: format!("message {i}"),
content_parts: None,
attachments: vec![],
metadata: serde_json::json!({}),
}
})
.collect();
archivist
.append_messages(scroll_id, messages, None)
.await
.expect("append_messages");
scroll_id
}
#[tokio::test]
async fn replay_delivers_archived_messages_to_stream() {
let archivist = build_in_memory_archivist().await;
let scroll_id = seed_session_with_messages(&archivist, 10).await;
let mock = MockStream::new("mock", StreamScope::Session { scroll_id });
let stream: Arc<dyn SessionStream> = mock.clone();
let report = replay_session_to_stream(
archivist.as_ref(),
scroll_id,
stream,
ReplayOptions {
include_meta_events: false,
speed: ReplaySpeed::AsFastAsPossible,
},
)
.await
.expect("replay_session_to_stream");
assert_eq!(report.events_sent, 10, "events_sent");
assert_eq!(report.failures, 0, "failures");
assert_eq!(mock.received_count(), 10, "mock received count");
let received = mock.received.lock().unwrap();
for evt in received.iter() {
assert!(
matches!(evt.origin, EventOrigin::Replay { .. }),
"every replayed event must carry EventOrigin::Replay"
);
assert_eq!(
evt.routing.scroll_id,
Some(scroll_id),
"every replayed event must carry the authoritative scroll_id"
);
}
}
#[tokio::test]
async fn replay_continues_on_stream_failure() {
let archivist = build_in_memory_archivist().await;
let scroll_id = seed_session_with_messages(&archivist, 10).await;
let mock = MockStream::new("mock", StreamScope::Session { scroll_id });
mock.fail_next(3);
let stream: Arc<dyn SessionStream> = mock.clone();
let report = replay_session_to_stream(
archivist.as_ref(),
scroll_id,
stream,
ReplayOptions {
include_meta_events: false,
speed: ReplaySpeed::AsFastAsPossible,
},
)
.await
.expect("replay_session_to_stream");
// events_sent counts attempted (ok + failed); failures counts Failed only.
assert_eq!(report.events_sent, 10, "events_sent counts every attempt");
assert_eq!(report.failures, 3, "first 3 events rejected by mock");
assert_eq!(
mock.received_count(),
7,
"mock buffer contains the 7 successful events"
);
}