//! OpenCode.ai session ingestion. //! //! This module provides functionality to fetch sessions from OpenCode.ai //! and convert them into fixture format. use crate::{ fixture::{ types::{ Fixture, Message, MessageRole, Participant, ParticipantKind, Responders, ResponderStrategy, Session, Streaming, }, validate_fixture, }, MockerError, Result, }; use opencode_client::{MessageWithParts, OpenCodeClient}; use std::collections::HashMap; use std::path::Path; // ============================================================================ // Public API // ============================================================================ /// Run the complete ingestion workflow. /// /// This is the main entry point for ingesting sessions from OpenCode.ai. /// /// # Arguments /// /// * `base_url` - Base URL of the OpenCode API /// * `session_id` - Optional specific session ID to ingest /// * `all` - Whether to ingest all sessions /// * `output` - Output file path for the fixture /// * `merge` - Whether to merge with existing fixture /// /// # Workflow /// /// 1. Fetch session(s) from OpenCode API /// 2. Map to fixture format /// 3. Load existing fixture if merge is enabled /// 4. Merge fixtures if necessary /// 5. Validate the final fixture /// 6. Export to YAML file pub async fn run_ingest( base_url: &str, session_id: Option, all: bool, output: &Path, merge: bool, ) -> Result<()> { // Step 1: Create ingestor and fetch sessions tracing::info!("Creating OpenCode client"); let ingestor = OpenCodeIngestor::new(base_url); let opencode_sessions = if all { tracing::info!("Fetching all sessions from OpenCode"); ingestor.fetch_all_sessions().await? } else if let Some(id) = session_id { tracing::info!("Fetching single session: {}", id); vec![ingestor.fetch_session(&id).await?] } else { return Err(MockerError::Ingest( "Must specify either session_id or all".to_string(), )); }; tracing::info!("Fetched {} session(s)", opencode_sessions.len()); // Step 2: Map sessions to fixture format tracing::info!("Mapping sessions to fixture format"); let new_fixture = map_sessions_to_fixture(opencode_sessions)?; // Step 3: Load existing fixture if merge is enabled let final_fixture = if merge && output.exists() { tracing::info!("Loading existing fixture for merge: {}", output.display()); let existing_fixture = load_or_create_fixture(output).await?; tracing::info!( "Merging {} existing sessions with {} new sessions", existing_fixture.sessions.len(), new_fixture.sessions.len() ); merge_fixtures(existing_fixture, new_fixture)? } else { tracing::info!("Creating new fixture (no merge)"); new_fixture }; tracing::info!( "Final fixture has {} session(s)", final_fixture.sessions.len() ); // Step 4: Validate the fixture tracing::info!("Validating fixture"); validate_fixture(&final_fixture)?; // Step 5: Export to YAML tracing::info!("Exporting fixture to: {}", output.display()); export_fixture(&final_fixture, output).await?; tracing::info!("Ingestion complete!"); Ok(()) } // ============================================================================ // OpenCodeIngestor // ============================================================================ /// OpenCode session ingestor. /// /// Handles fetching sessions and messages from OpenCode.ai API. struct OpenCodeIngestor { client: OpenCodeClient, } impl OpenCodeIngestor { /// Create a new OpenCode ingestor. fn new(base_url: &str) -> Self { Self { client: OpenCodeClient::new(base_url), } } /// Fetch a single session by ID. async fn fetch_session(&self, session_id: &str) -> Result { tracing::debug!("Fetching session: {}", session_id); let session = self .client .get_session(session_id) .await .map_err(|e| MockerError::Ingest(format!("Failed to fetch session: {}", e)))?; tracing::debug!("Fetching messages for session: {}", session_id); let messages = self .client .list_messages(session_id) .await .map_err(|e| MockerError::Ingest(format!("Failed to fetch messages: {}", e)))?; tracing::debug!( "Fetched {} messages for session {}", messages.len(), session_id ); Ok(OpenCodeSession { session, messages }) } /// Fetch all sessions from the API. async fn fetch_all_sessions(&self) -> Result> { tracing::debug!("Fetching all sessions"); let sessions = self .client .list_sessions() .await .map_err(|e| MockerError::Ingest(format!("Failed to list sessions: {}", e)))?; tracing::info!("Found {} sessions to ingest", sessions.len()); let mut results = Vec::new(); for session_info in sessions { match self.fetch_session(&session_info.id).await { Ok(session) => results.push(session), Err(e) => { tracing::warn!("Failed to fetch session {}: {}", session_info.id, e); // Continue with other sessions } } } Ok(results) } } /// OpenCode session with messages. struct OpenCodeSession { session: opencode_client::Session, messages: Vec, } // ============================================================================ // Session Mapping // ============================================================================ /// Map OpenCode sessions to fixture format. fn map_sessions_to_fixture(opencode_sessions: Vec) -> Result { let sessions: Vec = opencode_sessions .into_iter() .map(|oc_session| map_session(oc_session)) .collect::>>()?; // Create default responders and streaming config let responders = create_default_responders(); let streaming = create_default_streaming(); Ok(Fixture { version: "0.1".to_string(), sessions, responders, streaming, }) } /// Map a single OpenCode session to fixture format. fn map_session(oc_session: OpenCodeSession) -> Result { let session_id = oc_session.session.id.clone(); // Create participants (user and assistant) let participants = vec![ Participant { id: "user".to_string(), kind: ParticipantKind::User, display_name: Some("User".to_string()), }, Participant { id: "assistant".to_string(), kind: ParticipantKind::Assistant, display_name: Some("Assistant".to_string()), }, ]; // Map messages let messages: Vec = oc_session .messages .into_iter() .enumerate() .filter_map(|(idx, msg_with_parts)| map_message(&session_id, idx, msg_with_parts)) .collect(); // Convert Unix timestamp (milliseconds) to ISO8601 let created_at = chrono::DateTime::from_timestamp_millis( oc_session.session.time.created as i64 ) .map(|dt| dt.to_rfc3339()) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); Ok(Session { id: session_id, title: oc_session.session.title, created_at, participants, messages, behavior: None, }) } /// Map an OpenCode message to fixture format. /// /// Returns None if the message has no text content. fn map_message( session_id: &str, index: usize, msg_with_parts: MessageWithParts, ) -> Option { // Extract role and timestamp from message info let (role, created_ms) = match msg_with_parts.info { opencode_client::Message::User(user_msg) => { (MessageRole::User, user_msg.time.created) } opencode_client::Message::Assistant(assistant_msg) => { (MessageRole::Assistant, assistant_msg.time.created) } }; // Extract text content from parts // We concatenate all text and reasoning parts for simplicity in v0.1 let mut content_parts = Vec::new(); for part in msg_with_parts.parts { match part { opencode_client::Part::Text(text_part) => { content_parts.push(text_part.text); } opencode_client::Part::Reasoning(reasoning_part) => { // Include reasoning in content for v0.1 content_parts.push(format!("[Reasoning]\n{}", reasoning_part.text)); } opencode_client::Part::Tool(tool_part) => { // Include tool information for context let tool_info = match tool_part.state { opencode_client::ToolState::Completed { output, title, .. } => { format!("[Tool: {}]\n{}\nOutput: {}", tool_part.tool, title, output) } opencode_client::ToolState::Error { error, .. } => { format!("[Tool: {} - Error]\n{}", tool_part.tool, error) } opencode_client::ToolState::Running { title, .. } => { format!( "[Tool: {} - Running]\n{}", tool_part.tool, title.unwrap_or_default() ) } opencode_client::ToolState::Pending => { format!("[Tool: {} - Pending]", tool_part.tool) } }; content_parts.push(tool_info); } // Ignore other part types for v0.1 _ => {} } } // If no content, skip this message if content_parts.is_empty() { return None; } let content = content_parts.join("\n\n"); // Convert timestamp let created_at = chrono::DateTime::from_timestamp_millis(created_ms as i64) .map(|dt| dt.to_rfc3339()) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); // Generate message ID let message_id = format!("msg-{}", index); // Parent ID for threading (assistant messages follow user messages) let parent_id = if role == MessageRole::Assistant && index > 0 { Some(format!("msg-{}", index - 1)) } else { None }; Some(Message { id: message_id, session_id: session_id.to_string(), role, content, created_at, parent_id, metadata: None, }) } /// Create default responder configuration. /// /// Uses Echo strategy for simplicity. fn create_default_responders() -> Responders { Responders { keyword_map: HashMap::new(), default_strategy: ResponderStrategy::Echo, random: None, } } /// Create default streaming configuration. fn create_default_streaming() -> Streaming { Streaming { enabled: true, tokens_per_chunk: 5, chunk_interval_ms: 50, jitter_ms: Some(10), } } // ============================================================================ // Fixture Merging // ============================================================================ /// Merge two fixtures together. /// /// Combines sessions from both fixtures, handling duplicate session IDs /// by renaming with a numeric suffix. fn merge_fixtures(existing: Fixture, new: Fixture) -> Result { let mut merged = existing.clone(); let mut existing_ids: HashMap = HashMap::new(); // Track existing session IDs for session in &merged.sessions { existing_ids.insert(session.id.clone(), 0); } // Add new sessions, renaming duplicates for mut session in new.sessions { if existing_ids.contains_key(&session.id) { // Find a unique ID by appending a suffix let original_id = session.id.clone(); let mut suffix = 1; let mut new_id = format!("{}-{}", original_id, suffix); while existing_ids.contains_key(&new_id) { suffix += 1; new_id = format!("{}-{}", original_id, suffix); } tracing::info!("Renaming duplicate session {} to {}", original_id, new_id); // Update session ID and message session IDs session.id = new_id.clone(); for message in &mut session.messages { message.session_id = new_id.clone(); } existing_ids.insert(new_id, 0); } else { existing_ids.insert(session.id.clone(), 0); } merged.sessions.push(session); } // Merge keyword maps (new takes precedence) for (keyword, response) in new.responders.keyword_map { merged.responders.keyword_map.insert(keyword, response); } // Use new fixture's responder strategy and streaming if different // (In practice, we use the existing fixture's settings) Ok(merged) } /// Load an existing fixture from a file, or create a new empty one. async fn load_or_create_fixture(path: &Path) -> Result { if path.exists() { tracing::debug!("Loading existing fixture from: {}", path.display()); let content = tokio::fs::read_to_string(path).await.map_err(|e| { MockerError::FixtureLoad(format!("Failed to read fixture file: {}", e)) })?; let fixture: Fixture = serde_yaml::from_str(&content).map_err(|e| { MockerError::FixtureLoad(format!("Failed to parse fixture YAML: {}", e)) })?; Ok(fixture) } else { tracing::debug!("Creating new empty fixture"); Ok(Fixture { version: "0.1".to_string(), sessions: Vec::new(), responders: create_default_responders(), streaming: create_default_streaming(), }) } } // ============================================================================ // YAML Export // ============================================================================ /// Export a fixture to a YAML file. /// /// Creates parent directories if they don't exist. async fn export_fixture(fixture: &Fixture, path: &Path) -> Result<()> { // Create parent directories if necessary if let Some(parent) = path.parent() { tokio::fs::create_dir_all(parent).await.map_err(|e| { MockerError::FixtureLoad(format!("Failed to create parent directories: {}", e)) })?; } // Serialize to YAML with pretty formatting let yaml = serde_yaml::to_string(fixture).map_err(|e| { MockerError::FixtureLoad(format!("Failed to serialize fixture to YAML: {}", e)) })?; // Write to file tokio::fs::write(path, yaml).await.map_err(|e| { MockerError::Transport(std::io::Error::new( std::io::ErrorKind::Other, format!("Failed to write fixture file: {}", e), )) })?; Ok(()) } // ============================================================================ // Tests // ============================================================================ #[cfg(test)] mod tests { use super::*; #[test] fn test_create_default_responders() { let responders = create_default_responders(); assert_eq!(responders.default_strategy, ResponderStrategy::Echo); assert!(responders.keyword_map.is_empty()); assert!(responders.random.is_none()); } #[test] fn test_create_default_streaming() { let streaming = create_default_streaming(); assert!(streaming.enabled); assert_eq!(streaming.tokens_per_chunk, 5); assert_eq!(streaming.chunk_interval_ms, 50); assert_eq!(streaming.jitter_ms, Some(10)); } #[test] fn test_merge_fixtures_no_duplicates() { let existing = Fixture { version: "0.1".to_string(), sessions: vec![Session { id: "session-1".to_string(), title: "Session 1".to_string(), created_at: "2025-01-01T00:00:00Z".to_string(), participants: vec![], messages: vec![], behavior: None, }], responders: create_default_responders(), streaming: create_default_streaming(), }; let new = Fixture { version: "0.1".to_string(), sessions: vec![Session { id: "session-2".to_string(), title: "Session 2".to_string(), created_at: "2025-01-01T00:00:00Z".to_string(), participants: vec![], messages: vec![], behavior: None, }], responders: create_default_responders(), streaming: create_default_streaming(), }; let merged = merge_fixtures(existing, new).unwrap(); assert_eq!(merged.sessions.len(), 2); assert_eq!(merged.sessions[0].id, "session-1"); assert_eq!(merged.sessions[1].id, "session-2"); } #[test] fn test_merge_fixtures_with_duplicates() { let existing = Fixture { version: "0.1".to_string(), sessions: vec![Session { id: "session-1".to_string(), title: "Session 1".to_string(), created_at: "2025-01-01T00:00:00Z".to_string(), participants: vec![], messages: vec![], behavior: None, }], responders: create_default_responders(), streaming: create_default_streaming(), }; let new = Fixture { version: "0.1".to_string(), sessions: vec![Session { id: "session-1".to_string(), title: "Session 1 (New)".to_string(), created_at: "2025-01-02T00:00:00Z".to_string(), participants: vec![], messages: vec![], behavior: None, }], responders: create_default_responders(), streaming: create_default_streaming(), }; let merged = merge_fixtures(existing, new).unwrap(); assert_eq!(merged.sessions.len(), 2); assert_eq!(merged.sessions[0].id, "session-1"); assert_eq!(merged.sessions[1].id, "session-1-1"); // Renamed } }